[hornetq-commits] JBoss hornetq SVN: r9993 - in trunk: src/main/org/hornetq/api/core and 18 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Sun Dec 5 05:00:14 EST 2010
Author: gaohoward
Date: 2010-12-05 05:00:09 -0500 (Sun, 05 Dec 2010)
New Revision: 9993
Added:
trunk/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
trunk/src/main/org/hornetq/utils/DeflaterReader.java
trunk/src/main/org/hornetq/utils/HornetQBufferInputStream.java
trunk/src/main/org/hornetq/utils/InflaterReader.java
trunk/src/main/org/hornetq/utils/InflaterWriter.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
trunk/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java
Modified:
trunk/src/config/common/schema/hornetq-jms.xsd
trunk/src/main/org/hornetq/api/core/Message.java
trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java
trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java
trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-448
Large Message Compression Impl
Modified: trunk/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-jms.xsd 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/config/common/schema/hornetq-jms.xsd 2010-12-05 10:00:09 UTC (rev 9993)
@@ -75,7 +75,11 @@
</xsd:element>
<xsd:element name="min-large-message-size" type="xsd:long"
maxOccurs="1" minOccurs="0">
- </xsd:element>
+ </xsd:element>
+ <xsd:element name="compress-large-messages" type="xsd:boolean"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
+
<xsd:element name="client-id" type="xsd:string"
maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: trunk/src/main/org/hornetq/api/core/Message.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/Message.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/api/core/Message.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -65,6 +65,8 @@
public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_HQ_ORIG_MESSAGE_ID");
public static final SimpleString HDR_GROUP_ID = new SimpleString("_HQ_GROUP_ID");
+
+ public static final SimpleString HDR_LARGE_COMPRESSED = new SimpleString("_HQ_LARGE_COMPRESSED");
public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_HQ_SCHED_DELIVERY");
Modified: trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -138,4 +138,7 @@
CoreRemotingConnection getConnection();
+ void setCompressLargeMessages(boolean compressLargeMessage);
+
+ boolean isCompressLargeMessages();
}
Modified: trunk/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/api/core/client/HornetQClient.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -43,6 +43,8 @@
// Any message beyond this size is considered a large message (to be sent in chunks)
public static final int DEFAULT_MIN_LARGE_MESSAGE_SIZE = 100 * 1024;
+
+ public static final boolean DEFAULT_COMPRESS_LARGE_MESSAGES = false;
public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -29,6 +29,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.utils.DecompressedLargeMessageBuffer;
import org.hornetq.utils.Future;
import org.hornetq.utils.PriorityLinkedList;
import org.hornetq.utils.PriorityLinkedListImpl;
@@ -454,6 +455,11 @@
// ClientConsumerInternal implementation
// --------------------------------------------------------------
+ public ClientSessionInternal getSession()
+ {
+ return session;
+ }
+
public SessionQueueQueryResponseMessage getQueueInfo()
{
return queueInfo;
@@ -554,7 +560,14 @@
currentLargeMessageBuffer = new LargeMessageBufferImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
- currentChunkMessage.setBuffer(currentLargeMessageBuffer);
+ if (currentChunkMessage.isCompressed())
+ {
+ currentChunkMessage.setBuffer(new DecompressedLargeMessageBuffer(currentLargeMessageBuffer));
+ }
+ else
+ {
+ currentChunkMessage.setBuffer(currentLargeMessageBuffer);
+ }
currentChunkMessage.setFlowControlSize(0);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -67,4 +67,6 @@
void start();
SessionQueueQueryResponseMessage getQueueInfo();
+
+ ClientSessionInternal getSession();
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -21,11 +21,11 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.utils.DataConstants;
/**
*
@@ -117,6 +117,11 @@
{
return largeMessage;
}
+
+ public boolean isCompressed()
+ {
+ return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED);
+ }
/**
* @param largeMessage the largeMessage to set
@@ -142,7 +147,6 @@
"]";
}
- // FIXME - only used for large messages - move it!
/* (non-Javadoc)
* @see org.hornetq.api.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
*/
@@ -150,7 +154,7 @@
{
if (largeMessage)
{
- ((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
+ ((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
}
else
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -44,4 +44,6 @@
void discardLargeBody();
void setBuffer(HornetQBuffer buffer);
+
+ boolean isCompressed();
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -22,12 +22,13 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.utils.DeflaterReader;
+import org.hornetq.utils.HornetQBufferInputStream;
import org.hornetq.utils.TokenBucketLimiter;
import org.hornetq.utils.UUIDGenerator;
@@ -150,7 +151,7 @@
{
return;
}
-
+
doCleanup();
}
@@ -190,7 +191,7 @@
{
return credits;
}
-
+
// Protected ------------------------------------------------------------------------------------
// Package Private ------------------------------------------------------------------------------
@@ -203,7 +204,7 @@
{
session.returnCredits(address);
}
-
+
session.removeProducer(this);
closed = true;
@@ -212,12 +213,13 @@
private void doSend(final SimpleString address, final Message msg) throws HornetQException
{
MessageInternal msgI = (MessageInternal)msg;
-
+
ClientProducerCredits theCredits;
-
+
boolean isLarge;
- if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
+ if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
+ msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
{
isLarge = true;
}
@@ -236,7 +238,7 @@
{
msg.setAddress(address);
}
-
+
// Anonymous
theCredits = session.getCredits(address, true);
}
@@ -250,7 +252,7 @@
{
msg.setAddress(this.address);
}
-
+
theCredits = credits;
}
@@ -270,8 +272,6 @@
session.workDone();
-
-
if (isLarge)
{
largeMessageSend(sendBlocking, msgI, theCredits);
@@ -322,8 +322,16 @@
* @param msgI
* @throws HornetQException
*/
- private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI, final ClientProducerCredits credits) throws HornetQException
+ private void largeMessageSend(final boolean sendBlocking,
+ final MessageInternal msgI,
+ final ClientProducerCredits credits) throws HornetQException
{
+
+ if (session.isCompressLargeMessages())
+ {
+ msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
+ }
+
int headerSize = msgI.getHeadersAndPropertiesEncodeSize();
if (headerSize >= minLargeMessageSize)
@@ -341,7 +349,6 @@
HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(headerSize);
msgI.encodeHeadersAndProperties(headerBuffer);
-
SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.toByteBuffer().array());
channel.send(initialChunk);
@@ -358,7 +365,7 @@
if (input != null)
{
- largeMessageSendStreamed(sendBlocking, input, credits);
+ largeMessageSendStreamed(sendBlocking, msgI, input, credits);
}
else
{
@@ -375,72 +382,29 @@
final MessageInternal msgI,
final ClientProducerCredits credits) throws HornetQException
{
- BodyEncoder context = msgI.getBodyEncoder();
-
- final long bodySize = context.getLargeBodySize();
-
- context.open();
- try
- {
-
- for (int pos = 0; pos < bodySize;)
- {
- final boolean lastChunk;
-
- final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
-
- final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength);
-
- context.encode(bodyBuffer, chunkLength);
-
- pos += chunkLength;
-
- lastChunk = pos >= bodySize;
-
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
- .array(),
- !lastChunk,
- lastChunk && sendBlocking);
-
- if (sendBlocking && lastChunk)
- {
- // When sending it blocking, only the last chunk will be blocking.
- channel.sendBlocking(chunk);
- }
- else
- {
- channel.send(chunk);
- }
-
- try
- {
- credits.acquireCredits(chunk.getPacketSize());
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- finally
- {
- context.close();
- }
+ msgI.getBodyBuffer().readerIndex(0);
+ largeMessageSendStreamed(sendBlocking, msgI, new HornetQBufferInputStream(msgI.getBodyBuffer()), credits);
}
/**
- * TODO: This method could be eliminated and
- * combined with {@link ClientProducerImpl#largeMessageSendBuffered(boolean, Message, ClientProducerCredits)}.
- * All that's needed for this is ClientMessage returning the proper BodyEncoder for streamed
* @param sendBlocking
* @param input
* @throws HornetQException
*/
private void largeMessageSendStreamed(final boolean sendBlocking,
- final InputStream input,
+ final MessageInternal msgI,
+ final InputStream inputStreamParameter,
final ClientProducerCredits credits) throws HornetQException
{
boolean lastPacket = false;
+ InputStream input = inputStreamParameter;
+
+ if (session.isCompressLargeMessages())
+ {
+ input = new DeflaterReader(inputStreamParameter);
+ }
+
while (!lastPacket)
{
byte[] buff = new byte[minLargeMessageSize];
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -30,6 +30,7 @@
import org.hornetq.api.core.*;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
@@ -146,6 +147,8 @@
public final Exception e = new Exception();
private final Object waitLock = new Object();
+
+ private boolean compressLargeMessages;
// Static
// ---------------------------------------------------------------------------------------
@@ -202,6 +205,8 @@
closeExecutor = orderedExecutorFactory.getExecutor();
this.interceptors = interceptors;
+
+ compressLargeMessages = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
}
@@ -768,6 +773,7 @@
serverLocator.isBlockOnDurableSend(),
serverLocator.isCacheLargeMessagesClient(),
serverLocator.getMinLargeMessageSize(),
+ compressLargeMessages,
serverLocator.getInitialMessagePacketSize(),
serverLocator.getGroupID(),
connection,
@@ -1358,4 +1364,14 @@
cancelled = true;
}
}
+
+ public void setCompressLargeMessages(boolean compressLargeMessage)
+ {
+ this.compressLargeMessages = compressLargeMessage;
+ }
+
+ public boolean isCompressLargeMessages()
+ {
+ return this.compressLargeMessages;
+ }
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -155,6 +155,8 @@
private final boolean blockOnDurableSend;
private final int minLargeMessageSize;
+
+ private final boolean compressLargeMessages;
private volatile int initialMessagePacketSize;
@@ -207,6 +209,7 @@
final boolean blockOnDurableSend,
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final int initialMessagePacketSize,
final String groupID,
final CoreRemotingConnection remotingConnection,
@@ -259,6 +262,8 @@
this.cacheLargeMessageClient = cacheLargeMessageClient;
this.minLargeMessageSize = minLargeMessageSize;
+
+ this.compressLargeMessages = compressLargeMessages;
this.initialMessagePacketSize = initialMessagePacketSize;
@@ -269,7 +274,7 @@
// ClientSession implementation
// -----------------------------------------------------------------
-
+
public void createQueue(final SimpleString address, final SimpleString queueName) throws HornetQException
{
internalCreateQueue(address, queueName, null, false, false);
@@ -684,6 +689,11 @@
{
return minLargeMessageSize;
}
+
+ public boolean isCompressLargeMessages()
+ {
+ return compressLargeMessages;
+ }
/**
* @return the cacheLargeMessageClient
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -39,6 +39,8 @@
boolean isCacheLargeMessageClient();
int getMinLargeMessageSize();
+
+ boolean isCompressLargeMessages();
void expire(long consumerID, long messageID) throws HornetQException;
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -561,4 +561,9 @@
{
session.addMetaData(key, data);
}
+
+ public boolean isCompressLargeMessages()
+ {
+ return session.isCompressLargeMessages();
+ }
}
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
+
+/**
+ * A ConnectionManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 27 Nov 2008 18:45:46
+ *
+ *
+ */
+public interface FailoverManager
+{
+ ClientSession createSession(final String username,
+ final String password,
+ final boolean xa,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preAcknowledge,
+ final int ackBatchSize,
+ final boolean cacheLargeMessageClient,
+ final int minLargeMessageSize,
+ final boolean compressLargeMessages,
+ final boolean blockOnAcknowledge,
+ final boolean autoGroup,
+ final int confirmationWindowSize,
+ final int producerWindowSize,
+ final int consumerWindowSize,
+ final int producerMaxRate,
+ final int consumerMaxRate,
+ final boolean blockOnNonDurableSend,
+ final boolean blockOnDurableSend,
+ final int initialMessagePacketSize,
+ final String groupID) throws HornetQException;
+
+ void removeSession(final ClientSessionInternal session);
+
+ public CoreRemotingConnection getConnection();
+
+ int numConnections();
+
+ int numSessions();
+
+ void addFailureListener(SessionFailureListener listener);
+
+ boolean removeFailureListener(SessionFailureListener listener);
+
+ void causeExit();
+
+}
Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -287,7 +287,6 @@
*/
public synchronized boolean waitCompletion(final long timeWait) throws HornetQException
{
-
if (outStream == null)
{
// There is no stream.. it will never achieve the end of streaming
@@ -1258,11 +1257,12 @@
{
try
{
+ output.write(packet.getBody());
if (!packet.isContinues())
{
streamEnded = true;
+ output.close();
}
- output.write(packet.getBody());
}
catch (IOException e)
{
Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferInternal.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -17,6 +17,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
/**
* A LargeMessageBufferInternal
@@ -55,6 +56,8 @@
* Saves this buffer to the specified output.
*/
void saveBuffer(final OutputStream output) throws HornetQException;
+
+ public void addPacket(final SessionReceiveContinuationMessage packet);
/**
* Waits for the completion for the specified waiting time (in milliseconds).
Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -177,6 +177,7 @@
long callTimeout,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
+ boolean compressLargeMessage,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
@@ -211,6 +212,7 @@
long callTimeout,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
+ boolean compressLargeMessages,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
Modified: trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -69,6 +69,10 @@
int getMinLargeMessageSize();
void setMinLargeMessageSize(int minLargeMessageSize);
+
+ boolean isCompressLargeMessages();
+
+ void setCompressLargeMessages(boolean compress);
int getConsumerWindowSize();
Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -60,6 +60,8 @@
private boolean cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
private int minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ private boolean compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
private int consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
@@ -248,6 +250,16 @@
this.minLargeMessageSize = minLargeMessageSize;
}
+ public boolean isCompressLargeMessages()
+ {
+ return compressLargeMessage;
+ }
+
+ public void setCompressLargeMessages(final boolean compress)
+ {
+ this.compressLargeMessage = compress;
+ }
+
public int getConsumerWindowSize()
{
return consumerWindowSize;
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -293,6 +293,11 @@
"min-large-message-size",
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
Validators.GT_ZERO);
+
+ boolean compressLargeMessages = XMLConfigurationUtil.getBoolean(e,
+ "compress-large-messages",
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES);
+
boolean blockOnAcknowledge = XMLConfigurationUtil.getBoolean(e,
"block-on-acknowledge",
HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
@@ -417,6 +422,7 @@
cfConfig.setCallTimeout(callTimeout);
cfConfig.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cfConfig.setMinLargeMessageSize(minLargeMessageSize);
+ cfConfig.setCompressLargeMessages(compressLargeMessages);
cfConfig.setConsumerWindowSize(consumerWindowSize);
cfConfig.setConsumerMaxRate(consumerMaxRate);
cfConfig.setConfirmationWindowSize(confirmationWindowSize);
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -18,7 +18,6 @@
import javax.naming.Context;
import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
import javax.naming.NamingException;
import javax.transaction.xa.Xid;
@@ -723,6 +722,7 @@
final long callTimeout,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessage,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
@@ -761,6 +761,7 @@
configuration.setCallTimeout(callTimeout);
configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
configuration.setMinLargeMessageSize(minLargeMessageSize);
+ configuration.setCompressLargeMessages(compressLargeMessage);
configuration.setConsumerWindowSize(consumerWindowSize);
configuration.setConsumerMaxRate(consumerMaxRate);
configuration.setConfirmationWindowSize(confirmationWindowSize);
@@ -797,6 +798,7 @@
final long callTimeout,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
+ final boolean compressLargeMessages,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
@@ -836,6 +838,7 @@
configuration.setCallTimeout(callTimeout);
configuration.setCacheLargeMessagesClient(cacheLargeMessagesClient);
configuration.setMinLargeMessageSize(minLargeMessageSize);
+ configuration.setCompressLargeMessages(compressLargeMessages);
configuration.setConsumerWindowSize(consumerWindowSize);
configuration.setConsumerMaxRate(consumerMaxRate);
configuration.setConfirmationWindowSize(confirmationWindowSize);
Added: trunk/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,1091 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.util.concurrent.Executor;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.LargeMessageBufferInternal;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * A DecompressedHornetQBuffer
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DecompressedLargeMessageBuffer implements LargeMessageBufferInternal
+{
+
+ // Constants -----------------------------------------------------
+
+ private static final String OPERATION_NOT_SUPPORTED = "Operation not supported";
+
+ private static final String READ_ONLY_ERROR_MESSAGE = "This is a read-only buffer, setOperations are not supported";
+
+ // Attributes ----------------------------------------------------
+
+ final LargeMessageBufferInternal bufferDelegate;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DecompressedLargeMessageBuffer(final LargeMessageBufferInternal bufferDelegate)
+ {
+ this.bufferDelegate = bufferDelegate;
+ }
+
+
+ // Public --------------------------------------------------------
+
+ /**
+ *
+ */
+ public void discardUnusedPackets()
+ {
+ bufferDelegate.discardUnusedPackets();
+ }
+
+ /**
+ * Add a buff to the List, or save it to the OutputStream if set
+ * @param packet
+ */
+ public void addPacket(final SessionReceiveContinuationMessage packet)
+ {
+ bufferDelegate.addPacket(packet);
+ }
+
+ public synchronized void cancel()
+ {
+ bufferDelegate.cancel();
+ }
+
+ public synchronized void close()
+ {
+ bufferDelegate.cancel();
+ }
+
+ public void setOutputStream(final OutputStream output) throws HornetQException
+ {
+ bufferDelegate.setOutputStream(new InflaterWriter(output));
+ }
+
+ public synchronized void saveBuffer(final OutputStream output) throws HornetQException
+ {
+ setOutputStream(output);
+ waitCompletion(0);
+ }
+
+ /**
+ *
+ * @param timeWait Milliseconds to Wait. 0 means forever
+ * @throws Exception
+ */
+ public synchronized boolean waitCompletion(final long timeWait) throws HornetQException
+ {
+ return bufferDelegate.waitCompletion(timeWait);
+ }
+
+ // Channel Buffer Implementation ---------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#array()
+ */
+ public byte[] array()
+ {
+ throw new IllegalAccessError("array not supported on LargeMessageBufferImpl");
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#capacity()
+ */
+ public int capacity()
+ {
+ return -1;
+ }
+
+ DataInputStream dataInput = null;
+
+ private DataInputStream getStream()
+ {
+ if (dataInput == null)
+ {
+ try
+ {
+ InputStream input = new HornetQBufferInputStream(bufferDelegate);
+
+ dataInput = new DataInputStream(new InflaterReader(input));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException (e.getMessage(), e);
+ }
+
+ }
+ return dataInput;
+ }
+
+ private void positioningNotSupported()
+ {
+ throw new IllegalStateException("Position not supported over compressed large messages");
+ }
+
+ public byte readByte()
+ {
+ try
+ {
+ return getStream().readByte();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException (e.getMessage(), e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getByte(int)
+ */
+ public byte getByte(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ private byte getByte(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+ */
+ public void getBytes(final int index, final HornetQBuffer dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+ */
+ public void getBytes(final long index, final HornetQBuffer dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, byte[], int, int)
+ */
+ public void getBytes(final int index, final byte[] dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ public void getBytes(final long index, final byte[] dst, final int dstIndex, final int length)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.ByteBuffer)
+ */
+ public void getBytes(final int index, final ByteBuffer dst)
+ {
+ positioningNotSupported();
+ }
+
+ public void getBytes(final long index, final ByteBuffer dst)
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.io.OutputStream, int)
+ */
+ public void getBytes(final int index, final OutputStream out, final int length) throws IOException
+ {
+ positioningNotSupported();
+ }
+
+ public void getBytes(final long index, final OutputStream out, final int length) throws IOException
+ {
+ positioningNotSupported();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getBytes(int, java.nio.channels.GatheringByteChannel, int)
+ */
+ public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getInt(int)
+ */
+ public int getInt(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public int getInt(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getLong(int)
+ */
+ public long getLong(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public long getLong(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getShort(int)
+ */
+ public short getShort(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public short getShort(final long index)
+ {
+ return (short)(getByte(index) << 8 | getByte(index + 1) & 0xFF);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#getUnsignedMedium(int)
+ */
+ public int getUnsignedMedium(final int index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+
+
+ public int getUnsignedMedium(final long index)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setByte(int, byte)
+ */
+ public void setByte(final int index, final byte value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, org.hornetq.api.core.buffers.ChannelBuffer, int, int)
+ */
+ public void setBytes(final int index, final HornetQBuffer src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, byte[], int, int)
+ */
+ public void setBytes(final int index, final byte[] src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.nio.ByteBuffer)
+ */
+ public void setBytes(final int index, final ByteBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.io.InputStream, int)
+ */
+ public int setBytes(final int index, final InputStream in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setBytes(int, java.nio.channels.ScatteringByteChannel, int)
+ */
+ public int setBytes(final int index, final ScatteringByteChannel in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setInt(int, int)
+ */
+ public void setInt(final int index, final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setLong(int, long)
+ */
+ public void setLong(final int index, final long value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setMedium(int, int)
+ */
+ public void setMedium(final int index, final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#setShort(int, short)
+ */
+ public void setShort(final int index, final short value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#toByteBuffer(int, int)
+ */
+ public ByteBuffer toByteBuffer(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#toString(int, int, java.lang.String)
+ */
+ public String toString(final int index, final int length, final String charsetName)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public int readerIndex()
+ {
+ return 0;
+ }
+
+ public void readerIndex(final int readerIndex)
+ {
+ // TODO
+ }
+
+ public int writerIndex()
+ {
+ // TODO
+ return 0;
+ }
+
+ public long getSize()
+ {
+ // TODO
+ return 0;
+ }
+
+ public void writerIndex(final int writerIndex)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setIndex(final int readerIndex, final int writerIndex)
+ {
+ positioningNotSupported();
+ }
+
+ public void clear()
+ {
+ }
+
+ public boolean readable()
+ {
+ return true;
+ }
+
+ public boolean writable()
+ {
+ return false;
+ }
+
+ public int readableBytes()
+ {
+ return 1;
+ }
+
+ public int writableBytes()
+ {
+ return 0;
+ }
+
+ public void markReaderIndex()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void resetReaderIndex()
+ {
+ // TODO: reset positioning if possible
+ }
+
+ public void markWriterIndex()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void resetWriterIndex()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void discardReadBytes()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public short getUnsignedByte(final int index)
+ {
+ return (short)(getByte(index) & 0xFF);
+ }
+
+ public int getUnsignedShort(final int index)
+ {
+ return getShort(index) & 0xFFFF;
+ }
+
+ public int getMedium(final int index)
+ {
+ int value = getUnsignedMedium(index);
+ if ((value & 0x800000) != 0)
+ {
+ value |= 0xff000000;
+ }
+ return value;
+ }
+
+ public long getUnsignedInt(final int index)
+ {
+ return getInt(index) & 0xFFFFFFFFL;
+ }
+
+ public void getBytes(int index, final byte[] dst)
+ {
+ // TODO: optimize this by using System.arraycopy
+ for (int i = 0; i < dst.length; i++)
+ {
+ dst[i] = getByte(index++);
+ }
+ }
+
+ public void getBytes(long index, final byte[] dst)
+ {
+ // TODO: optimize this by using System.arraycopy
+ for (int i = 0; i < dst.length; i++)
+ {
+ dst[i] = getByte(index++);
+ }
+ }
+
+ public void getBytes(final int index, final HornetQBuffer dst)
+ {
+ getBytes(index, dst, dst.writableBytes());
+ }
+
+ public void getBytes(final int index, final HornetQBuffer dst, final int length)
+ {
+ if (length > dst.writableBytes())
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ getBytes(index, dst, dst.writerIndex(), length);
+ dst.writerIndex(dst.writerIndex() + length);
+ }
+
+ public void setBytes(final int index, final byte[] src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setBytes(final int index, final HornetQBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setBytes(final int index, final HornetQBuffer src, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setZero(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public short readUnsignedByte()
+ {
+ try
+ {
+ return (short)getStream().readUnsignedByte();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException (e.getMessage(), e);
+ }
+ }
+
+ public short readShort()
+ {
+ try
+ {
+ return (short)getStream().readShort();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException (e.getMessage(), e);
+ }
+ }
+
+ public int readUnsignedShort()
+ {
+ try
+ {
+ return (int)getStream().readUnsignedShort();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException (e.getMessage(), e);
+ }
+ }
+
+ public int readMedium()
+ {
+ int value = readUnsignedMedium();
+ if ((value & 0x800000) != 0)
+ {
+ value |= 0xff000000;
+ }
+ return value;
+ }
+
+
+ public int readUnsignedMedium()
+ {
+ return (readByte() & 0xff) << 16 | (readByte() & 0xff) << 8 | (readByte() & 0xff) << 0;
+ }
+
+ public int readInt()
+ {
+ try
+ {
+ return getStream().readInt();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public int readInt(final int pos)
+ {
+ positioningNotSupported();
+ return 0;
+ }
+
+ public long readUnsignedInt()
+ {
+ return readInt() & 0xFFFFFFFFL;
+ }
+
+ public long readLong()
+ {
+ try
+ {
+ return getStream().readLong();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void readBytes(final byte[] dst, final int dstIndex, final int length)
+ {
+ try
+ {
+ getStream().read(dst, dstIndex, length);
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void readBytes(final byte[] dst)
+ {
+ readBytes(dst, 0, dst.length);
+ }
+
+ public void readBytes(final HornetQBuffer dst)
+ {
+ readBytes(dst, dst.writableBytes());
+ }
+
+ public void readBytes(final HornetQBuffer dst, final int length)
+ {
+ if (length > dst.writableBytes())
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ readBytes(dst, dst.writerIndex(), length);
+ dst.writerIndex(dst.writerIndex() + length);
+ }
+
+ public void readBytes(final HornetQBuffer dst, final int dstIndex, final int length)
+ {
+ byte[] destBytes = new byte[length];
+ readBytes(destBytes);
+ dst.setBytes(dstIndex, destBytes);
+ }
+
+ public void readBytes(final ByteBuffer dst)
+ {
+ byte bytesToGet[] = new byte[dst.remaining()];
+ readBytes(bytesToGet);
+ dst.put(bytesToGet);
+ }
+
+ public int readBytes(final GatheringByteChannel out, final int length) throws IOException
+ {
+ throw new IllegalStateException("Not implemented!");
+ }
+
+ public void readBytes(final OutputStream out, final int length) throws IOException
+ {
+ throw new IllegalStateException("Not implemented!");
+ }
+
+ public void skipBytes(final int length)
+ {
+
+ try
+ {
+ for (int i = 0 ; i < length; i++)
+ {
+ getStream().read();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public void writeByte(final byte value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeShort(final short value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeMedium(final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeInt(final int value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeLong(final long value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final byte[] src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final byte[] src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final HornetQBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final HornetQBuffer src, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final ByteBuffer src)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public int writeBytes(final InputStream in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public int writeBytes(final ScatteringByteChannel in, final int length) throws IOException
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeZero(final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public ByteBuffer toByteBuffer()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public ByteBuffer[] toByteBuffers()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public ByteBuffer[] toByteBuffers(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public String toString(final String charsetName)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public Object getUnderlyingBuffer()
+ {
+ return this;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readBoolean()
+ */
+ public boolean readBoolean()
+ {
+ return readByte() != 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readChar()
+ */
+ public char readChar()
+ {
+ return (char)readShort();
+ }
+
+ public char getChar(final int index)
+ {
+ return (char)getShort(index);
+ }
+
+ public double getDouble(final int index)
+ {
+ return Double.longBitsToDouble(getLong(index));
+ }
+
+ public float getFloat(final int index)
+ {
+ return Float.intBitsToFloat(getInt(index));
+ }
+
+ public HornetQBuffer readBytes(final int length)
+ {
+ byte bytesToGet[] = new byte[length];
+ readBytes(bytesToGet);
+ return HornetQBuffers.wrappedBuffer(bytesToGet);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readDouble()
+ */
+ public double readDouble()
+ {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readFloat()
+ */
+ public float readFloat()
+ {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readNullableSimpleString()
+ */
+ public SimpleString readNullableSimpleString()
+ {
+ int b = readByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return readSimpleString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readNullableString()
+ */
+ public String readNullableString()
+ {
+ int b = readByte();
+ if (b == DataConstants.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ return readString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readSimpleString()
+ */
+ public SimpleString readSimpleString()
+ {
+ int len = readInt();
+ byte[] data = new byte[len];
+ readBytes(data);
+ return new SimpleString(data);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readString()
+ */
+ public String readString()
+ {
+ int len = readInt();
+
+ if (len < 9)
+ {
+ char[] chars = new char[len];
+ for (int i = 0; i < len; i++)
+ {
+ chars[i] = (char)readShort();
+ }
+ return new String(chars);
+ }
+ else if (len < 0xfff)
+ {
+ return readUTF();
+ }
+ else
+ {
+ return readSimpleString().toString();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#readUTF()
+ */
+ public String readUTF()
+ {
+ return UTF8Util.readUTF(this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeBoolean(boolean)
+ */
+ public void writeBoolean(final boolean val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeChar(char)
+ */
+ public void writeChar(final char val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeDouble(double)
+ */
+ public void writeDouble(final double val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeFloat(float)
+ */
+ public void writeFloat(final float val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeNullableSimpleString(org.hornetq.util.SimpleString)
+ */
+ public void writeNullableSimpleString(final SimpleString val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeNullableString(java.lang.String)
+ */
+ public void writeNullableString(final String val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeSimpleString(org.hornetq.util.SimpleString)
+ */
+ public void writeSimpleString(final SimpleString val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeString(java.lang.String)
+ */
+ public void writeString(final String val)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.HornetQBuffer#writeUTF(java.lang.String)
+ */
+ public void writeUTF(final String utf)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.buffers.ChannelBuffer#compareTo(org.hornetq.api.core.buffers.ChannelBuffer)
+ */
+ public int compareTo(final HornetQBuffer buffer)
+ {
+ return -1;
+ }
+
+ public HornetQBuffer copy()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public HornetQBuffer slice(final int index, final int length)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * @param body
+ */
+ // Inner classes -------------------------------------------------
+
+ public ChannelBuffer channelBuffer()
+ {
+ return null;
+ }
+
+ public HornetQBuffer copy(final int index, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public HornetQBuffer duplicate()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public HornetQBuffer readSlice(final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setChar(final int index, final char value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setDouble(final int index, final double value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void setFloat(final int index, final float value)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public HornetQBuffer slice()
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+
+ public void writeBytes(final HornetQBuffer src, final int srcIndex, final int length)
+ {
+ throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+ }
+}
Added: trunk/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/DeflaterReader.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/DeflaterReader.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.Deflater;
+
+/**
+ * A DeflaterReader
+ * The reader takes an inputstream and compress it.
+ * Not for concurrent use.
+
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ */
+public class DeflaterReader extends InputStream
+{
+ private Deflater deflater = new Deflater();
+ private boolean isFinished = false;
+ private boolean compressDone = false;
+
+ private InputStream input;
+
+ public DeflaterReader(InputStream inData)
+ {
+ input = inData;
+ }
+
+ public int read() throws IOException
+ {
+ byte[] buffer = new byte[1];
+ int n = read(buffer, 0, 1);
+ if (n == 1)
+ {
+ return (int)buffer[0] & 0xFF;
+ }
+ if (n == -1 || n == 0)
+ {
+ return -1;
+ }
+ throw new IOException("Error reading data, invalid n: " + n);
+ }
+
+ /**
+ * Try to fill the buffer with compressed bytes. Except the last effective read,
+ * this method always returns with a full buffer of compressed data.
+ *
+ * @param buffer the buffer to fill compressed bytes
+ * @return the number of bytes really filled, -1 indicates end.
+ * @throws IOException
+ */
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException
+ {
+ if (compressDone)
+ {
+ return -1;
+ }
+
+ //buffer for reading input stream
+ byte[] readBuffer = new byte[2 * len];
+
+ int n = 0;
+ int read = 0;
+
+ while (len > 0)
+ {
+ n = deflater.deflate(buffer, offset, len);
+ if (n == 0)
+ {
+ if (isFinished)
+ {
+ deflater.end();
+ compressDone = true;
+ break;
+ }
+ else if (deflater.needsInput())
+ {
+ // read some data from inputstream
+ int m = input.read(readBuffer);
+
+ if (m == -1)
+ {
+ deflater.finish();
+ isFinished = true;
+ }
+ else
+ {
+ deflater.setInput(readBuffer, 0, m);
+ }
+ }
+ else
+ {
+ deflater.finish();
+ isFinished = true;
+ }
+ }
+ else
+ {
+ read += n;
+ offset += n;
+ len -= n;
+ }
+ }
+ return read;
+ }
+
+}
Added: trunk/src/main/org/hornetq/utils/HornetQBufferInputStream.java
===================================================================
--- trunk/src/main/org/hornetq/utils/HornetQBufferInputStream.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/HornetQBufferInputStream.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Used to send large messages
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStream extends InputStream
+{
+
+ /* (non-Javadoc)
+ * @see java.io.InputStream#read()
+ */
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ private HornetQBuffer bb;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public HornetQBufferInputStream(final HornetQBuffer paramByteBuffer)
+ {
+ bb = paramByteBuffer;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("read on a closed InputStream");
+ }
+
+ if (remainingBytes() == 0)
+ {
+ return -1;
+ }
+ else
+ {
+ return bb.readByte() & 0xFF;
+ }
+ }
+
+ @Override
+ public int read(final byte[] byteArray) throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("read on a closed InputStream");
+ }
+
+ return read(byteArray, 0, byteArray.length);
+ }
+
+ @Override
+ public int read(final byte[] byteArray, final int off, final int len) throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("read on a closed InputStream");
+ }
+
+ if (byteArray == null)
+ {
+ throw new NullPointerException();
+ }
+ if (off < 0 || off > byteArray.length || len < 0 || off + len > byteArray.length || off + len < 0)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0)
+ {
+ return 0;
+ }
+
+ int size = Math.min(remainingBytes(), len);
+
+ if (size == 0)
+ {
+ return -1;
+ }
+
+ bb.readBytes(byteArray, off, size);
+ return size;
+ }
+
+ @Override
+ public long skip(final long len) throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("skip on a closed InputStream");
+ }
+
+ if (len <= 0L)
+ {
+ return 0L;
+ }
+
+ int size = Math.min(remainingBytes(), (int) len);
+
+ bb.skipBytes((int)size);
+
+ return size;
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+ if (bb == null)
+ {
+ throw new IOException("available on a closed InputStream");
+ }
+
+ return remainingBytes();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ bb = null;
+ }
+
+ @Override
+ public synchronized void mark(final int paramInt)
+ {
+ }
+
+ @Override
+ public synchronized void reset() throws IOException
+ {
+ throw new IOException("mark/reset not supported");
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return false;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ /**
+ * @return
+ */
+ private int remainingBytes()
+ {
+ return bb.writerIndex() - bb.readerIndex();
+ }
+
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/src/main/org/hornetq/utils/InflaterReader.java
===================================================================
--- trunk/src/main/org/hornetq/utils/InflaterReader.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/InflaterReader.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+/**
+ * An InflaterReader
+ * It takes an compressed input stream and decompressed it as it is being read.
+ * Not for concurrent use.
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ */
+public class InflaterReader extends InputStream
+{
+ private Inflater inflater = new Inflater();
+
+ private InputStream input;
+
+ private byte[] readBuffer;
+ private int pointer;
+ private int length;
+
+ public InflaterReader(InputStream input)
+ {
+ this(input, 1024);
+ }
+
+ public InflaterReader(InputStream input, int bufferSize)
+ {
+ this.input = input;
+ this.readBuffer = new byte[bufferSize];
+ this.pointer = -1;
+ }
+
+ public int read() throws IOException
+ {
+ if (pointer == -1)
+ {
+ try
+ {
+ length = doRead(readBuffer, 0, readBuffer.length);
+ if (length == 0)
+ {
+ return -1;
+ }
+ pointer = 0;
+ }
+ catch (DataFormatException e)
+ {
+ throw new IOException(e);
+ }
+ }
+
+ int value = readBuffer[pointer] & 0xFF;
+ pointer++;
+ if (pointer == length)
+ {
+ pointer = -1;
+ }
+
+ return value;
+ }
+
+ /*
+ * feed inflater more bytes in order to get some
+ * decompressed output.
+ * returns number of bytes actually got
+ */
+ private int doRead(byte[] buf, int offset, int len) throws DataFormatException, IOException
+ {
+ int read = 0;
+ int n = 0;
+ byte[] inputBuffer = new byte[len];
+
+ while (len > 0)
+ {
+ n = inflater.inflate(buf, offset, len);
+ if (n == 0)
+ {
+ if (inflater.finished())
+ {
+ break;
+ }
+ else if (inflater.needsInput())
+ {
+ //feeding
+ int m = input.read(inputBuffer);
+
+ if (m == -1)
+ {
+ //it shouldn't be here, throw exception
+ throw new DataFormatException("Input is over while inflater still expecting data");
+ }
+ else
+ {
+ //feed the data in
+ inflater.setInput(inputBuffer);
+ n = inflater.inflate(buf, offset, len);
+ if (n > 0)
+ {
+ read += n;
+ offset += n;
+ len -= n;
+ }
+ }
+ }
+ else
+ {
+ //it shouldn't be here, throw
+ throw new DataFormatException("Inflater is neither finished nor needing input.");
+ }
+ }
+ else
+ {
+ read += n;
+ offset += n;
+ len -= n;
+ }
+ }
+ return read;
+ }
+
+}
Added: trunk/src/main/org/hornetq/utils/InflaterWriter.java
===================================================================
--- trunk/src/main/org/hornetq/utils/InflaterWriter.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/InflaterWriter.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+/**
+ * A InflaterWriter
+ *
+ * This class takes an OutputStream. Compressed bytes
+ * can directly be written into this class. The class will
+ * decompress the bytes and write them to the output stream.
+ *
+ * Not for concurrent use.
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ */
+public class InflaterWriter extends OutputStream
+{
+ private Inflater inflater = new Inflater();
+ private OutputStream output;
+
+ private byte[] writeBuffer = new byte[1024];
+ private int writePointer = 0;
+
+ private byte[] outputBuffer = new byte[writeBuffer.length*2];
+
+ public InflaterWriter(OutputStream output)
+ {
+ this.output = output;
+ }
+
+ /*
+ * Write a compressed byte.
+ */
+ @Override
+ public void write(int b) throws IOException
+ {
+ writeBuffer[writePointer] = (byte)(b & 0xFF);
+ writePointer++;
+
+ if (writePointer == writeBuffer.length)
+ {
+ writePointer = 0;
+ try
+ {
+ doWrite();
+ }
+ catch (DataFormatException e)
+ {
+ throw new IOException("Error decompressing data", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (writePointer > 0)
+ {
+ inflater.setInput(writeBuffer, 0, writePointer);
+ try
+ {
+ int n = inflater.inflate(outputBuffer);
+ while (n > 0)
+ {
+ output.write(outputBuffer, 0, n);
+ n = inflater.inflate(outputBuffer);
+ }
+ output.close();
+ }
+ catch (DataFormatException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private void doWrite() throws DataFormatException, IOException
+ {
+ inflater.setInput(writeBuffer);
+ int n = inflater.inflate(outputBuffer);
+
+ while (n > 0)
+ {
+ output.write(outputBuffer, 0, n);
+ n = inflater.inflate(outputBuffer);
+ }
+ }
+
+}
Modified: trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
===================================================================
--- trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2010-12-05 10:00:09 UTC (rev 9993)
@@ -20,6 +20,7 @@
<producer-window-size>7712652</producer-window-size>
<producer-max-rate>789</producer-max-rate>
<min-large-message-size>12</min-large-message-size>
+ <compress-large-messages>true</compress-large-messages>
<client-id>TestClientID</client-id>
<dups-ok-batch-size>3456</dups-ok-batch-size>
<transaction-batch-size>4567</transaction-batch-size>
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -68,6 +68,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -14,15 +14,10 @@
package org.hornetq.jms.tests;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import javax.naming.InitialContext;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
-import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import org.hornetq.jms.client.HornetQQueueConnectionFactory;
import org.hornetq.jms.client.HornetQTopicConnectionFactory;
@@ -81,6 +76,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
@@ -115,6 +111,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
@@ -149,6 +146,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -303,6 +303,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
prefetchSize,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Copied: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java (from rev 9972, branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,310 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+
+/**
+ * A LargeMessageCompressTest
+ *
+ * Just extend the LargeMessageTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ *
+ */
+public class LargeMessageCompressTest extends LargeMessageTest
+{
+ // Constructors --------------------------------------------------
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected ClientSessionFactory createSessionFactory() throws Exception
+ {
+ ClientSessionFactory sf = locator.createSessionFactory();
+ sf.setCompressLargeMessages(true);
+ return sf;
+ }
+
+
+ public void testLargeMessageCompression() throws Exception
+ {
+ final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+ sf.setCompressLargeMessages(true);
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ byte b = msg1.getBodyBuffer().readByte();
+ assertEquals("position = " + i, getSamplebyte(i), b);
+ }
+
+ msg1.acknowledge();
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testLargeMessageCompression2() throws Exception
+ {
+ final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+ sf.setCompressLargeMessages(true);
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ String testDir = this.getTestDir();
+ File testFile = new File(testDir, "async_large_message");
+ FileOutputStream output = new FileOutputStream(testFile);
+
+ msg1.setOutputStream(output);
+
+ msg1.waitOutputStreamCompletion(0);
+
+ msg1.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ //verify
+ FileInputStream input = new FileInputStream(testFile);
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ byte b = (byte)input.read();
+ assertEquals("position = " + i, getSamplebyte(i), b);
+ }
+
+ testFile.delete();
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testLargeMessageCompression3() throws Exception
+ {
+ final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+ sf.setCompressLargeMessages(true);
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ String testDir = this.getTestDir();
+ File testFile = new File(testDir, "async_large_message");
+ FileOutputStream output = new FileOutputStream(testFile);
+
+ msg1.saveToOutputStream(output);
+
+ msg1.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ //verify
+ FileInputStream input = new FileInputStream(testFile);
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ byte b = (byte)input.read();
+ assertEquals("position = " + i, getSamplebyte(i), b);
+ }
+
+ testFile.delete();
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+
+ // below are large message tests that are not applied to compressed messages
+
+ public void testResendSmallStreamMessage() throws Exception
+ {
+ }
+
+ public void testResendLargeStreamMessage() throws Exception
+ {
+ }
+
+ public void testResendCachedSmallStreamMessage() throws Exception
+ {
+ }
+
+ public void testResendCachedLargeStreamMessage() throws Exception
+ {
+ }
+
+ public void testSimpleRollback() throws Exception
+ {
+ }
+
+ public void testSimpleRollbackXA() throws Exception
+ {
+ }
+
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -59,7 +59,7 @@
// Static --------------------------------------------------------
private final Logger log = Logger.getLogger(LargeMessageTest.class);
- private ServerLocator locator;
+ protected ServerLocator locator;
// Constructors --------------------------------------------------
@@ -70,6 +70,11 @@
return false;
}
+ protected ClientSessionFactory createSessionFactory() throws Exception
+ {
+ return locator.createSessionFactory();
+ }
+
public void testCloseConsumer() throws Exception
{
final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -82,7 +87,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -153,7 +158,7 @@
public void doTestLargeBuffer(boolean transacted) throws Exception
{
final int journalsize = 100 * 1024;
- final int messageSize = 3 * journalsize;
+ final int messageSize = 3 * journalsize + 5;
// final int messageSize = 5 * 1024;
ClientSession session = null;
@@ -170,7 +175,9 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
+
+ sf.setCompressLargeMessages(true);
session = sf.createSession(!transacted, !transacted, 0);
@@ -254,7 +261,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -307,7 +314,7 @@
server.start();
- sf = locator.createSessionFactory();
+ sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -381,7 +388,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -464,7 +471,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
SimpleString ADDRESS_DLA = LargeMessageTest.ADDRESS.concat("-dla");
SimpleString ADDRESS_EXPIRY = LargeMessageTest.ADDRESS.concat("-expiry");
@@ -599,7 +606,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
SimpleString ADDRESS_DLA = LargeMessageTest.ADDRESS.concat("-dla");
SimpleString ADDRESS_EXPIRY = LargeMessageTest.ADDRESS.concat("-expiry");
@@ -675,7 +682,7 @@
server.start();
- sf = locator.createSessionFactory();
+ sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -742,7 +749,7 @@
server.getAddressSettingsRepository().addMatch("*", addressSettings);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -784,7 +791,7 @@
server.start();
- sf = locator.createSessionFactory();
+ sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -849,7 +856,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -887,6 +894,7 @@
}
catch (Throwable e)
{
+ log.error("failed", e);
failed = true;
}
@@ -955,7 +963,7 @@
locator.setCacheLargeMessagesClient(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(false, false, false);
@@ -1887,7 +1895,7 @@
SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
@@ -1970,7 +1978,7 @@
SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
@@ -1998,7 +2006,7 @@
server.start();
- sf = locator.createSessionFactory();
+ sf = createSessionFactory();
session = sf.createSession(null, null, false, true, true, false, 0);
}
@@ -2052,7 +2060,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(isXA, false, false);
@@ -2082,7 +2090,7 @@
session.close();
server.stop();
server.start();
- sf = locator.createSessionFactory();
+ sf = createSessionFactory();
session = sf.createSession(isXA, false, false);
session.rollback(xid);
@@ -2139,7 +2147,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
ClientSession session = sf.createSession(isXA, false, false);
@@ -2276,7 +2284,7 @@
locator.setMinLargeMessageSize(1024);
locator.setConsumerWindowSize(1024 * 1024);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(null, null, false, false, false, false, 0);
@@ -2380,7 +2388,7 @@
locator.setMinLargeMessageSize(1024);
locator.setConsumerWindowSize(1024 * 1024);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(null, null, false, false, false, false, 0);
@@ -2483,7 +2491,7 @@
locator.setMinLargeMessageSize(100 * 1024);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(null, null, false, true, true, false, 0);
@@ -2557,7 +2565,7 @@
locator.setMinLargeMessageSize(1024);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
session = sf.createSession(null, null, false, true, true, false, 0);
@@ -2633,7 +2641,7 @@
server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
ClientSession session = sf.createSession(false, false);
@@ -2687,6 +2695,77 @@
}
}
+ public void testLargeMessageCompression() throws Exception
+ {
+ final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createSessionFactory();
+ sf.setCompressLargeMessages(true);
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ //System.out.print(msg1.getBodyBuffer().readByte() + " ");
+ //if (i % 100 == 0) System.out.println();
+ byte b = msg1.getBodyBuffer().readByte();
+ //System.out.println("Byte read: " + (char)b + " i " + i);
+ assertEquals("position = " + i, getSamplebyte(i), b);
+ }
+
+ msg1.acknowledge();
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -2731,7 +2810,7 @@
try
{
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory();
if (sendBlocking)
{
@@ -2776,7 +2855,7 @@
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
- sf = locator.createSessionFactory();
+ sf = createSessionFactory();
}
session = sf.createSession(null, null, false, true, true, false, 0);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -136,6 +136,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -214,6 +214,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendMessageTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -307,6 +307,7 @@
callTimeout,
true,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -76,6 +76,7 @@
HornetQClient.DEFAULT_CALL_TIMEOUT,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -242,6 +242,7 @@
callTimeout,
true,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -154,6 +154,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/config/JMSServerConfigParserTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -74,6 +74,7 @@
assertEquals(7712652, cfConfig.getProducerWindowSize());
assertEquals(789, cfConfig.getProducerMaxRate());
assertEquals(12, cfConfig.getMinLargeMessageSize());
+ assertEquals(true, cfConfig.isCompressLargeMessages());
assertEquals("TestClientID", cfConfig.getClientID());
assertEquals(3456, cfConfig.getDupsOKBatchSize());
assertEquals(4567, cfConfig.getTransactionBatchSize());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -15,6 +15,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
@@ -35,12 +36,14 @@
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.client.impl.ClientMessageInternal;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.LargeMessageBufferImpl;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.HornetQBufferInputStream;
/**
* A LargeMessageBufferUnitTest
@@ -56,7 +59,7 @@
// Attributes ----------------------------------------------------
- static int tmpFileCounter = 0;
+ static int tmpFileCounter = 0;
// Static --------------------------------------------------------
@@ -67,13 +70,13 @@
protected void setUp() throws Exception
{
super.setUp();
-
+
tmpFileCounter++;
File tmp = new File(getTestDir());
tmp.mkdirs();
}
-
+
protected void tearDown() throws Exception
{
super.tearDown();
@@ -166,6 +169,20 @@
}
}
+ public void testReadIntegersOverStream() throws Exception
+ {
+ LargeMessageBufferImpl buffer = createBufferWithIntegers(3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+ HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+ DataInputStream dataInput = new DataInputStream(is);
+
+ for (int i = 1; i <= 15; i++)
+ {
+ Assert.assertEquals(i, dataInput.readInt());
+ }
+
+ assertEquals(-1, dataInput.read());
+ }
+
// testing void getBytes(int index, ChannelBuffer dst, int dstIndex, int length)
public void testReadLongs() throws Exception
{
@@ -186,6 +203,20 @@
}
}
+ public void testReadLongsOverStream() throws Exception
+ {
+ LargeMessageBufferImpl buffer = createBufferWithLongs(3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+ HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+ DataInputStream dataInput = new DataInputStream(is);
+
+ for (int i = 1; i <= 15; i++)
+ {
+ Assert.assertEquals(i, dataInput.readLong());
+ }
+
+ assertEquals(-1, dataInput.read());
+ }
+
public void testReadData() throws Exception
{
HornetQBuffer dynamic = HornetQBuffers.dynamicBuffer(1);
@@ -315,14 +346,14 @@
Assert.assertEquals(i, bytes[i]);
}
}
-
+
public void testSplitBufferOnFile() throws Exception
{
LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(),
- 1024 * 1024,
- 1,
- getTestFile(),
- 1024);
+ 1024 * 1024,
+ 1,
+ getTestFile(),
+ 1024);
try
{
@@ -525,6 +556,36 @@
}
+ public void testReadBytesOnStreaming() throws Exception
+ {
+ byte[] byteArray = new byte[1024];
+ for (int i = 0; i < byteArray.length; i++)
+ {
+ byteArray[i] = getSamplebyte(i);
+ }
+
+ HornetQBuffer splitbuffer = splitBuffer(3, byteArray);
+
+ HornetQBufferInputStream is = new HornetQBufferInputStream(splitbuffer);
+
+ for (int i = 0; i < 100; i++)
+ {
+ assertEquals(getSamplebyte(i), (byte)is.read());
+ }
+
+ for (int i = 100; i < byteArray.length; i += 10)
+ {
+ byte readBytes[] = new byte[10];
+
+ int size = is.read(readBytes);
+
+ for (int j = 0; j < size; j++)
+ {
+ assertEquals(getSamplebyte(i + j), readBytes[j]);
+ }
+ }
+ }
+
/**
* @return
*/
@@ -795,6 +856,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ClientConsumerInternal#getSession()
+ */
+ public ClientSessionInternal getSession()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
}
Copied: trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java (from rev 9972, branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.zip.Deflater;
+
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.DeflaterReader;
+import org.hornetq.utils.InflaterReader;
+import org.hornetq.utils.InflaterWriter;
+
+/**
+ * A CompressionUtilTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ */
+public class CompressionUtilTest extends UnitTestCase
+{
+
+ public void testDeflaterReader() throws Exception
+ {
+ String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
+
+ DeflaterReader reader = new DeflaterReader(inputStream);
+
+ ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+ int b = reader.read();
+
+ while (b != -1)
+ {
+ zipHolder.add(b);
+ b = reader.read();
+ }
+
+ byte[] allCompressed = new byte[zipHolder.size()];
+ for (int i = 0; i < allCompressed.length; i++)
+ {
+ allCompressed[i] = (byte) zipHolder.get(i).intValue();
+ }
+
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+
+ compareByteArray(allCompressed, output, compressedDataLength);
+ }
+
+ public void testDeflaterReader2() throws Exception
+ {
+ String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
+
+ DeflaterReader reader = new DeflaterReader(inputStream);
+
+ byte[] buffer = new byte[7];
+ ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+
+ int n = reader.read(buffer);
+ while (n != -1)
+ {
+ for (int i = 0; i < n; i++)
+ {
+ zipHolder.add((int)buffer[i]);
+ }
+ n = reader.read(buffer);
+ }
+
+ byte[] allCompressed = new byte[zipHolder.size()];
+ for (int i = 0; i < allCompressed.length; i++)
+ {
+ allCompressed[i] = (byte) zipHolder.get(i).intValue();
+ }
+
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+
+ compareByteArray(allCompressed, output, compressedDataLength);
+ }
+
+ public void testInflaterReader() throws Exception
+ {
+ String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+
+ byte[] zipBytes = new byte[compressedDataLength];
+
+ System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
+ ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
+
+ InflaterReader inflater = new InflaterReader(byteInput);
+ ArrayList<Integer> holder = new ArrayList<Integer>();
+ int read = inflater.read();
+
+ while (read != -1)
+ {
+ holder.add(read);
+ read = inflater.read();
+ }
+
+ byte[] result = new byte[holder.size()];
+
+ for (int i = 0; i < result.length; i++)
+ {
+ result[i] = holder.get(i).byteValue();
+ }
+
+ String txt = new String(result);
+
+ assertEquals(inputString, txt);
+
+ }
+
+ public void testInflaterWriter() throws Exception
+ {
+ String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+
+ byte[] zipBytes = new byte[compressedDataLength];
+
+ System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
+ ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
+
+ ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
+ InflaterWriter writer = new InflaterWriter(byteOutput);
+
+ byte[] zipBuffer = new byte[12];
+
+ int n = byteInput.read(zipBuffer);
+ while (n > 0)
+ {
+ writer.write(zipBuffer, 0, n);
+ n = byteInput.read(zipBuffer);
+ }
+
+ writer.close();
+
+ byte[] outcome = byteOutput.toByteArray();
+ String outStr = new String(outcome);
+
+ assertEquals(inputString, outStr);
+ }
+
+ private void compareByteArray(byte[] first, byte[] second, int length)
+ {
+ for (int i = 0; i < length; i++)
+ {
+ assertEquals(first[i], second[i]);
+ }
+ }
+}
Copied: trunk/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java (from rev 9972, branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/util/HornetQBufferInputStreamTest.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.HornetQBufferInputStream;
+
+/**
+ * A HornetQInputStreamTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class HornetQBufferInputStreamTest extends UnitTestCase
+{
+
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testReadBytes() throws Exception
+ {
+ byte bytes[] = new byte[10*1024];
+ for (int i = 0 ; i < bytes.length; i++)
+ {
+ bytes[i] = getSamplebyte(i);
+ }
+
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ HornetQBufferInputStream is = new HornetQBufferInputStream(buffer);
+
+ // First read byte per byte
+ for (int i = 0 ; i < 1024; i++)
+ {
+ assertEquals(getSamplebyte(i), is.read());
+ }
+
+ // Second, read in chunks
+ for (int i = 1; i < 10; i++)
+ {
+ bytes = new byte[1024];
+ is.read(bytes);
+ for (int j = 0 ; j < bytes.length; j++)
+ {
+ assertEquals(getSamplebyte(i * 1024 + j), bytes[j]);
+ }
+
+ }
+
+ assertEquals(-1, is.read());
+
+
+ bytes = new byte[1024];
+
+ int sizeRead = is.read(bytes);
+
+ assertEquals(-1, sizeRead);
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -190,6 +190,7 @@
callTimeout,
HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-12-03 19:28:19 UTC (rev 9992)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-12-05 10:00:09 UTC (rev 9993)
@@ -49,8 +49,6 @@
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
-import static org.hornetq.tests.util.ServiceTestBase.*;
-
/**
*
* Base class with basic utilities on starting up a basic server
More information about the hornetq-commits
mailing list