Author: gaohoward
Date: 2010-11-30 10:39:22 -0500 (Tue, 30 Nov 2010)
New Revision: 9952
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java
Log:
replace old impl
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-11-30
14:30:46 UTC (rev 9951)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-11-30
15:39:22 UTC (rev 9952)
@@ -13,16 +13,8 @@
package org.hornetq.core.client.impl;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.concurrent.Executor;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -30,13 +22,12 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
-import org.hornetq.utils.GZipUtil;
+import org.hornetq.utils.DeflaterReader;
import org.hornetq.utils.HornetQBufferInputStream;
import org.hornetq.utils.TokenBucketLimiter;
import org.hornetq.utils.UUIDGenerator;
@@ -411,8 +402,7 @@
if (session.isCompressLargeMessages())
{
- //input = GZipUtil.pipeGZip(inputStreamParameter, true,
session.getThreadPool());
- input = GZipUtil.createZipInputStream(inputStreamParameter);
+ input = new DeflaterReader(inputStreamParameter);
}
while (!lastPacket)
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-11-30
14:30:46 UTC (rev 9951)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2010-11-30
15:39:22 UTC (rev 9952)
@@ -1257,6 +1257,7 @@
{
try
{
+ System.err.println("___writing packet: " + packet + " output
" + output);
output.write(packet.getBody());
if (!packet.isContinues())
{
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-11-30
14:30:46 UTC (rev 9951)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-11-30
15:39:22 UTC (rev 9952)
@@ -98,7 +98,7 @@
public void setOutputStream(final OutputStream output) throws HornetQException
{
- bufferDelegate.setOutputStream(GZipUtil.createZipOutputStream(output));
+ bufferDelegate.setOutputStream(new InflaterWriter(output));
}
public synchronized void saveBuffer(final OutputStream output) throws
HornetQException
@@ -145,7 +145,7 @@
{
InputStream input = new HornetQBufferInputStream(bufferDelegate);
- dataInput = new DataInputStream(GZipUtil.createUnZipInputStream(input));
+ dataInput = new DataInputStream(new InflaterReader(input));
}
catch (Exception e)
{
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java 2010-11-30
14:30:46 UTC (rev 9951)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java 2010-11-30
15:39:22 UTC (rev 9952)
@@ -16,6 +16,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.zip.Deflater;
/**
@@ -26,7 +27,7 @@
* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
*
*/
-public class DeflaterReader
+public class DeflaterReader extends InputStream
{
private Deflater deflater = new Deflater();
private boolean isFinished = false;
@@ -38,12 +39,22 @@
{
input = inData;
}
-
- public int read(byte[] buffer) throws IOException
+
+ public int read() throws IOException
{
- return read(buffer, 0, buffer.length);
+ byte[] buffer = new byte[1];
+ int n = read(buffer, 0, 1);
+ if (n == 1)
+ {
+ return buffer[0];
+ }
+ if (n == -1)
+ {
+ return -1;
+ }
+ throw new IOException("Error reading data, invalid n: " + n);
}
-
+
/**
* Try to fill the buffer with compressed bytes. Except the last effective read,
* this method always returns with a full buffer of compressed data.
@@ -52,6 +63,7 @@
* @return the number of bytes really filled, -1 indicates end.
* @throws IOException
*/
+ @Override
public int read(byte[] buffer, int offset, int len) throws IOException
{
if (compressDone)
@@ -74,6 +86,7 @@
{
deflater.end();
compressDone = true;
+ read = -1;
break;
}
else if (deflater.needsInput())
@@ -120,6 +133,25 @@
DeflaterReader reader = new DeflaterReader(inputStream);
+
+ ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+ int b = reader.read();
+
+ while (b != -1)
+ {
+ zipHolder.add(b);
+ b = reader.read();
+ }
+
+ byte[] allCompressed = new byte[zipHolder.size()];
+ for (int i = 0; i < allCompressed.length; i++)
+ {
+ allCompressed[i] = (byte) zipHolder.get(i).intValue();
+ }
+
+ System.err.println("compressed: " + getBytesString(allCompressed));
+
+/*
byte[] buffer = new byte[7];
int n = reader.read(buffer);
@@ -131,7 +163,7 @@
System.err.println("==>read n " + n + " values: " +
getBytesString(buffer));
n = reader.read(buffer);
}
-
+*/
System.err.println("compressed.");
System.err.println("now verify");
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java 2010-11-30
14:30:46 UTC (rev 9951)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java 2010-11-30
15:39:22 UTC (rev 9952)
@@ -47,6 +47,11 @@
{
this.output = output;
}
+
+ private void log(String str)
+ {
+ System.err.println(this + " " + str);
+ }
/*
* Write a compressed byte.
@@ -54,6 +59,7 @@
@Override
public void write(int b) throws IOException
{
+ log("call write b: " + b);
writeBuffer[writePointer] = (byte)(b & 0xFF);
writePointer++;
@@ -62,6 +68,7 @@
writePointer = 0;
try
{
+ log("call doWrite");
doWrite();
}
catch (DataFormatException e)
@@ -74,6 +81,7 @@
@Override
public void close() throws IOException
{
+ log("call close");
if (writePointer > 0)
{
inflater.setInput(writeBuffer, 0, writePointer);