Author: gaohoward
Date: 2010-12-01 09:25:42 -0500 (Wed, 01 Dec 2010)
New Revision: 9960
Added:
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
cleanup and tests
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-12-01
12:23:36 UTC (rev 9959)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-12-01
14:25:42 UTC (rev 9960)
@@ -1257,7 +1257,6 @@
{
try
{
- System.err.println("___writing packet: " + packet + " output
" + output);
output.write(packet.getBody());
if (!packet.isContinues())
{
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-12-01
12:23:36 UTC (rev 9959)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-12-01
14:25:42 UTC (rev 9960)
@@ -17,8 +17,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
@@ -28,7 +26,6 @@
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.client.impl.LargeMessageBufferImpl;
import org.hornetq.core.client.impl.LargeMessageBufferInternal;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -45,18 +42,15 @@
// Constants -----------------------------------------------------
- /**
- *
- */
private static final String OPERATION_NOT_SUPPORTED = "Operation not
supported";
private static final String READ_ONLY_ERROR_MESSAGE = "This is a read-only
buffer, setOperations are not supported";
// Attributes ----------------------------------------------------
-
final LargeMessageBufferInternal bufferDelegate;
+ private long readerIndex = 0;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -411,7 +405,6 @@
public int readerIndex()
{
- // TODO
return 0;
}
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java 2010-12-01
12:23:36 UTC (rev 9959)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java 2010-12-01
14:25:42 UTC (rev 9960)
@@ -13,10 +13,8 @@
package org.hornetq.utils;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
import java.util.zip.Deflater;
/**
@@ -46,9 +44,9 @@
int n = read(buffer, 0, 1);
if (n == 1)
{
- return buffer[0];
+ return (int)buffer[0] & 0xFF;
}
- if (n == -1)
+ if (n == -1 || n == 0)
{
return -1;
}
@@ -86,19 +84,15 @@
{
deflater.end();
compressDone = true;
- read = -1;
break;
}
else if (deflater.needsInput())
{
- System.err.println("need input so read input");
// read some data from inputstream
int m = input.read(readBuffer);
- System.err.println("original data read: " + m);
+
if (m == -1)
{
- System.err.println("no more original data, finish deflater, now
offset " + offset + " len " + len);
-
deflater.finish();
isFinished = true;
}
@@ -119,74 +113,8 @@
offset += n;
len -= n;
}
-
}
return read;
}
- public static void main(String[] args) throws IOException
- {
- String inputString =
"blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
- byte[] input = inputString.getBytes("UTF-8");
-
- ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
-
- DeflaterReader reader = new DeflaterReader(inputStream);
-
-
- ArrayList<Integer> zipHolder = new ArrayList<Integer>();
- int b = reader.read();
-
- while (b != -1)
- {
- zipHolder.add(b);
- b = reader.read();
- }
-
- byte[] allCompressed = new byte[zipHolder.size()];
- for (int i = 0; i < allCompressed.length; i++)
- {
- allCompressed[i] = (byte) zipHolder.get(i).intValue();
- }
-
- System.err.println("compressed: " + getBytesString(allCompressed));
-
-/*
- byte[] buffer = new byte[7];
-
- int n = reader.read(buffer);
-
- System.err.println("first read: " + n);
-
- while (n != -1)
- {
- System.err.println("==>read n " + n + " values: " +
getBytesString(buffer));
- n = reader.read(buffer);
- }
-*/
- System.err.println("compressed.");
-
- System.err.println("now verify");
-
- byte[] output = new byte[30];
- Deflater compresser = new Deflater();
- compresser.setInput(input);
- compresser.finish();
- int compressedDataLength = compresser.deflate(output);
- System.err.println("compress len: " + compressedDataLength);
- System.err.println("commpress data: " + getBytesString(output));
-
- }
-
- static String getBytesString(byte[] array)
- {
- StringBuffer bf = new StringBuffer();
- for (byte b : array)
- {
- int val = b & 0xFF;
- bf.append(val + " ");
- }
- return bf.toString();
- }
-
}
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-12-01
12:23:36 UTC (rev 9959)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-12-01
14:25:42 UTC (rev 9960)
@@ -34,6 +34,7 @@
*/
public class GZipUtil
{
+
public static InputStream createZipInputStream(InputStream input) throws
HornetQException
{
try
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java 2010-12-01
12:23:36 UTC (rev 9959)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java 2010-12-01
14:25:42 UTC (rev 9960)
@@ -13,12 +13,9 @@
package org.hornetq.utils;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
import java.util.zip.DataFormatException;
-import java.util.zip.Deflater;
import java.util.zip.Inflater;
/**
@@ -50,30 +47,17 @@
this.pointer = -1;
}
- public static void log(String str)
- {
- System.out.println(str);
- }
-
public int read() throws IOException
{
- log("in read");
-
if (pointer == -1)
{
- log("pointer is -1");
-
try
{
- log("need to decompress more bytes");
length = doRead(readBuffer, 0, readBuffer.length);
- log("bytes decompressed:" + length);
if (length == 0)
{
- log("zero byte got, ending");
return -1;
}
- log("reset pointer to zero");
pointer = 0;
}
catch (DataFormatException e)
@@ -82,16 +66,13 @@
}
}
- log("reading byte at " + pointer);
int value = readBuffer[pointer] & 0xFF;
pointer++;
if (pointer == length)
{
- log("buffer all read, set pointer to -1");
pointer = -1;
}
- log("byte got: " + value);
return value;
}
@@ -153,44 +134,5 @@
}
return read;
}
-
- public static void main(String[] args) throws IOException
- {
- String inputString =
"blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
- byte[] input = inputString.getBytes("UTF-8");
- byte[] output = new byte[30];
- Deflater compresser = new Deflater();
- compresser.setInput(input);
- compresser.finish();
- int compressedDataLength = compresser.deflate(output);
- System.err.println("compress len: " + compressedDataLength);
- byte[] zipBytes = new byte[compressedDataLength];
-
- System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
- ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
-
- InflaterReader inflater = new InflaterReader(byteInput);
- ArrayList<Integer> holder = new ArrayList<Integer>();
- int read = inflater.read();
-
- while (read != -1)
- {
- holder.add(read);
- read = inflater.read();
- }
-
- byte[] result = new byte[holder.size()];
-
- System.out.println("total bytes: " + holder.size());
- for (int i = 0; i < result.length; i++)
- {
- result[i] = holder.get(i).byteValue();
- }
-
- String txt = new String(result);
- System.out.println("the result: " + txt);
-
- }
-
}
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java 2010-12-01
12:23:36 UTC (rev 9959)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java 2010-12-01
14:25:42 UTC (rev 9960)
@@ -47,11 +47,6 @@
{
this.output = output;
}
-
- private void log(String str)
- {
- System.err.println(this + " " + str);
- }
/*
* Write a compressed byte.
@@ -59,7 +54,6 @@
@Override
public void write(int b) throws IOException
{
- log("call write b: " + b);
writeBuffer[writePointer] = (byte)(b & 0xFF);
writePointer++;
@@ -68,7 +62,6 @@
writePointer = 0;
try
{
- log("call doWrite");
doWrite();
}
catch (DataFormatException e)
@@ -81,7 +74,6 @@
@Override
public void close() throws IOException
{
- log("call close");
if (writePointer > 0)
{
inflater.setInput(writeBuffer, 0, writePointer);
@@ -113,43 +105,5 @@
n = inflater.inflate(outputBuffer);
}
}
-
- public static void main(String[] args) throws IOException
- {
- String inputString =
"blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
- byte[] input = inputString.getBytes("UTF-8");
- byte[] output = new byte[30];
- Deflater compresser = new Deflater();
- compresser.setInput(input);
- compresser.finish();
- int compressedDataLength = compresser.deflate(output);
- System.err.println("compress len: " + compressedDataLength);
- byte[] zipBytes = new byte[compressedDataLength];
-
- System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
- ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
-
- ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
- InflaterWriter writer = new InflaterWriter(byteOutput);
-
- byte[] zipBuffer = new byte[12];
-
- int n = byteInput.read(zipBuffer);
- while (n > 0)
- {
- System.out.println("Writing: " + n);
- writer.write(zipBuffer, 0, n);
- n = byteInput.read(zipBuffer);
- }
-
- writer.close();
-
- byte[] outcome = byteOutput.toByteArray();
- String outStr = new String(outcome);
-
- System.out.println("Outcome: " + outStr);
-
- }
-
}
Modified:
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
---
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-12-01
12:23:36 UTC (rev 9959)
+++
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-12-01
14:25:42 UTC (rev 9960)
@@ -25,12 +25,15 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -736,209 +739,6 @@
}
}
- public void testResendSmallStreamMessage() throws Exception
- {
- internalTestResendMessage(50000);
- }
-
- public void testResendLargeStreamMessage() throws Exception
- {
- internalTestResendMessage(150 * 1024);
- }
-
- public void internalTestResendMessage(final long messageSize) throws Exception
- {
- ClientSession session = null;
-
- try
- {
- server = createServer(true, isNetty());
-
- server.start();
-
- ClientSessionFactory sf = createFactory(isNetty());
- sf.setCompressLargeMessages(true);
-
- session = sf.createSession(false, false, false);
-
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
-
- SimpleString ADDRESS2 = LargeMessageTest.ADDRESS.concat("-2");
-
- session.createQueue(ADDRESS2, ADDRESS2, true);
-
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
-
- ClientProducer producer2 = session.createProducer(ADDRESS2);
-
- Message clientFile = createLargeClientMessage(session, messageSize, false);
-
- producer.send(clientFile);
-
- session.commit();
-
- session.start();
-
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
- ClientConsumer consumer2 = session.createConsumer(ADDRESS2);
-
- ClientMessage msg1 = consumer.receive(10000);
- msg1.acknowledge();
-
- producer2.send(msg1);
-
- boolean failed = false;
-
- try
- {
- producer2.send(msg1);
- }
- catch (Throwable e)
- {
- failed = true;
- }
-
- Assert.assertTrue("Exception expected", failed);
-
- session.commit();
-
- ClientMessage msg2 = consumer2.receive(10000);
-
- Assert.assertNotNull(msg2);
-
- msg2.acknowledge();
-
- session.commit();
-
- Assert.assertEquals(messageSize, msg2.getBodySize());
-
- compareString(messageSize, msg2);
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
- session.close();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
- public void testResendCachedSmallStreamMessage() throws Exception
- {
- internalTestResendMessage(50000);
- }
-
- public void testResendCachedLargeStreamMessage() throws Exception
- {
- internalTestCachedResendMessage(150 * 1024);
- }
-
- public void internalTestCachedResendMessage(final long messageSize) throws Exception
- {
- ClientSession session = null;
-
- try
- {
- server = createServer(true, isNetty());
-
- server.start();
-
- ClientSessionFactory sf = createFactory(isNetty());
- sf.setCompressLargeMessages(true);
-
- sf.setMinLargeMessageSize(111);
-
- sf.setCacheLargeMessagesClient(true);
-
- session = sf.createSession(false, false, false);
-
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
-
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
-
- Message originalMsg = createLargeClientMessage(session, messageSize, false);
-
- producer.send(originalMsg);
-
- session.commit();
-
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
-
- session.start();
-
- ClientMessage msgReceived = consumer.receive(10000);
- msgReceived.acknowledge();
-
- session.commit();
-
- compareString(messageSize, msgReceived);
-
- msgReceived.getBodyBuffer().readerIndex(0);
-
- producer.send(msgReceived);
-
- session.commit();
-
- ClientMessage msgReceived2 = consumer.receive(10000);
-
- msgReceived2.acknowledge();
-
- compareString(messageSize, msgReceived2);
-
- session.commit();
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
- session.close();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
- /**
- * @param messageSize
- * @param msg2
- */
- private void compareString(final long messageSize, ClientMessage msg)
- {
- assertNotNull(msg);
- for (long i = 0; i < messageSize; i++)
- {
- Assert.assertEquals("position " + i, UnitTestCase.getSamplebyte(i),
msg.getBodyBuffer().readByte());
- }
- }
-
public void testFilePersistenceOneHugeMessage() throws Exception
{
testChunks(false,
@@ -2028,149 +1828,6 @@
}
- public void testSimpleRollback() throws Exception
- {
- simpleRollbackInternalTest(false);
- }
-
- public void testSimpleRollbackXA() throws Exception
- {
- simpleRollbackInternalTest(true);
- }
-
- public void simpleRollbackInternalTest(final boolean isXA) throws Exception
- {
- // there are two bindings.. one is ACKed, the other is not, the server is
restarted
- // The other binding is acked... The file must be deleted
-
- try
- {
-
- server = createServer(true, isNetty());
-
- server.start();
-
- ClientSessionFactory sf = createFactory(isNetty());
- sf.setCompressLargeMessages(true);
-
- ClientSession session = sf.createSession(isXA, false, false);
-
- Xid xid = null;
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, null,
true);
-
- int numberOfBytes = 200000;
-
- session.start();
-
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
-
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
-
- for (int n = 0; n < 10; n++)
- {
- Message clientFile = createLargeClientMessage(session, numberOfBytes, n % 2
== 0);
-
- producer.send(clientFile);
-
- Assert.assertNull(consumer.receiveImmediate());
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
-
- clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
-
- producer.send(clientFile);
-
- Assert.assertNull(consumer.receiveImmediate());
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.commit();
- }
-
- for (int i = 0; i < 2; i++)
- {
-
- ClientMessage clientMessage = consumer.receive(5000);
-
- Assert.assertNotNull(clientMessage);
-
- Assert.assertEquals(numberOfBytes,
clientMessage.getBodyBuffer().writerIndex());
-
- clientMessage.acknowledge();
-
- if (isXA)
- {
- if (i == 0)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- }
- else
- {
- if (i == 0)
- {
- session.rollback();
- }
- else
- {
- session.commit();
- }
- }
- }
- }
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
public void testBufferMultipleLargeMessages() throws Exception
{
ClientSession session = null;
@@ -2540,68 +2197,7 @@
}
}
- // The ClientConsumer should be able to also send ServerLargeMessages as that's
done by the CoreBridge
- public void testSendServerMessage() throws Exception
- {
- HornetQServer server = createServer(true);
- server.start();
-
- ClientSessionFactory sf = createFactory(false);
- sf.setCompressLargeMessages(true);
-
- ClientSession session = sf.createSession(false, false);
-
- try
- {
- LargeServerMessageImpl fileMessage = new
LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
-
- fileMessage.setMessageID(1005);
-
- for (int i = 0; i < LARGE_MESSAGE_SIZE; i++)
- {
- fileMessage.addBytes(new byte[] { UnitTestCase.getSamplebyte(i) });
- }
-
- fileMessage.releaseResources();
-
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
-
- ClientProducer prod = session.createProducer(LargeMessageTest.ADDRESS);
-
- prod.send(fileMessage);
-
- fileMessage.deleteFile();
-
- session.commit();
-
- session.start();
-
- ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
-
- ClientMessage msg = cons.receive(5000);
-
- Assert.assertNotNull(msg);
-
- Assert.assertEquals(msg.getBodySize(), LARGE_MESSAGE_SIZE);
-
- for (int i = 0; i < LARGE_MESSAGE_SIZE; i++)
- {
- Assert.assertEquals(UnitTestCase.getSamplebyte(i),
msg.getBodyBuffer().readByte());
- }
-
- msg.acknowledge();
-
- session.commit();
-
- }
- finally
- {
- sf.close();
- server.stop();
- }
- }
-
public void testLargeMessageCompression() throws Exception
{
final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
Modified:
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
---
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-01
12:23:36 UTC (rev 9959)
+++
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-12-01
14:25:42 UTC (rev 9960)
@@ -887,6 +887,7 @@
}
catch (Throwable e)
{
+ log.error("failed", e);
failed = true;
}
Added:
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
===================================================================
---
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
(rev 0)
+++
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java 2010-12-01
14:25:42 UTC (rev 9960)
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.zip.Deflater;
+
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.DeflaterReader;
+import org.hornetq.utils.InflaterReader;
+import org.hornetq.utils.InflaterWriter;
+
+/**
+ * A CompressionUtilTest
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ *
+ */
+public class CompressionUtilTest extends UnitTestCase
+{
+
+ public void testDeflaterReader() throws Exception
+ {
+ String inputString =
"blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
+
+ DeflaterReader reader = new DeflaterReader(inputStream);
+
+ ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+ int b = reader.read();
+
+ while (b != -1)
+ {
+ zipHolder.add(b);
+ b = reader.read();
+ }
+
+ byte[] allCompressed = new byte[zipHolder.size()];
+ for (int i = 0; i < allCompressed.length; i++)
+ {
+ allCompressed[i] = (byte) zipHolder.get(i).intValue();
+ }
+
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+
+ compareByteArray(allCompressed, output, compressedDataLength);
+ }
+
+ public void testDeflaterReader2() throws Exception
+ {
+ String inputString =
"blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
+
+ DeflaterReader reader = new DeflaterReader(inputStream);
+
+ byte[] buffer = new byte[7];
+ ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+
+ int n = reader.read(buffer);
+ while (n != -1)
+ {
+ for (int i = 0; i < n; i++)
+ {
+ zipHolder.add((int)buffer[i]);
+ }
+ n = reader.read(buffer);
+ }
+
+ byte[] allCompressed = new byte[zipHolder.size()];
+ for (int i = 0; i < allCompressed.length; i++)
+ {
+ allCompressed[i] = (byte) zipHolder.get(i).intValue();
+ }
+
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+
+ compareByteArray(allCompressed, output, compressedDataLength);
+ }
+
+ public void testInflaterReader() throws Exception
+ {
+ String inputString =
"blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+
+ byte[] zipBytes = new byte[compressedDataLength];
+
+ System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
+ ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
+
+ InflaterReader inflater = new InflaterReader(byteInput);
+ ArrayList<Integer> holder = new ArrayList<Integer>();
+ int read = inflater.read();
+
+ while (read != -1)
+ {
+ holder.add(read);
+ read = inflater.read();
+ }
+
+ byte[] result = new byte[holder.size()];
+
+ for (int i = 0; i < result.length; i++)
+ {
+ result[i] = holder.get(i).byteValue();
+ }
+
+ String txt = new String(result);
+
+ assertEquals(inputString, txt);
+
+ }
+
+ public void testInflaterWriter() throws Exception
+ {
+ String inputString =
"blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+ byte[] input = inputString.getBytes("UTF-8");
+ byte[] output = new byte[30];
+ Deflater compresser = new Deflater();
+ compresser.setInput(input);
+ compresser.finish();
+ int compressedDataLength = compresser.deflate(output);
+
+ byte[] zipBytes = new byte[compressedDataLength];
+
+ System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
+ ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
+
+ ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
+ InflaterWriter writer = new InflaterWriter(byteOutput);
+
+ byte[] zipBuffer = new byte[12];
+
+ int n = byteInput.read(zipBuffer);
+ while (n > 0)
+ {
+ writer.write(zipBuffer, 0, n);
+ n = byteInput.read(zipBuffer);
+ }
+
+ writer.close();
+
+ byte[] outcome = byteOutput.toByteArray();
+ String outStr = new String(outcome);
+
+ assertEquals(inputString, outStr);
+ }
+
+ private void compareByteArray(byte[] first, byte[] second, int length)
+ {
+ for (int i = 0; i < length; i++)
+ {
+ assertEquals(first[i], second[i]);
+ }
+ }
+}