Author: gaohoward
Date: 2010-11-11 09:44:07 -0500 (Thu, 11 Nov 2010)
New Revision: 9875
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
Log:
Let the GZipPipe util class extend InputStream
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-11-11
12:32:35 UTC (rev 9874)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2010-11-11
14:44:07 UTC (rev 9875)
@@ -411,7 +411,8 @@
if (session.isCompressLargeMessages())
{
- input = GZipUtil.pipeGZip(inputStreamParameter, true, session.getThreadPool());
+ //input = GZipUtil.pipeGZip(inputStreamParameter, true,
session.getThreadPool());
+ input = GZipUtil.createZipInputStream(inputStreamParameter);
}
while (!lastPacket)
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-11
12:32:35 UTC (rev 9874)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-11
14:44:07 UTC (rev 9875)
@@ -16,6 +16,7 @@
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -166,6 +167,18 @@
}
});
}
+
+ public static InputStream createZipInputStream(InputStream input) throws
HornetQException
+ {
+ try
+ {
+ return new GZipPipe(input, 1024);
+ }
+ catch (IOException e)
+ {
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
e.getMessage(), e);
+ }
+ }
/*
* we keep a list of byte arrays. when writing, we start with the first array.
@@ -249,15 +262,16 @@
}
return result;
}
-
}
- public static class GZipPipe
+ public static class GZipPipe extends InputStream
{
private InputStream input;
private byte[] readBuffer;
private GZIPOutputStream zipOut;
private DynamicOutputStream receiver;
+ private int readPointer;
+ private byte[] buffer;
public GZipPipe(InputStream raw, int size) throws IOException
{
@@ -265,10 +279,30 @@
readBuffer = new byte[size];
receiver = new DynamicOutputStream(size, 50);
zipOut = new GZIPOutputStream(receiver);
+ readPointer = 0;
+ buffer = read1();
}
- public byte[] read() throws IOException
+ public int read() throws IOException
{
+ if (buffer == null)
+ {
+ return -1;
+ }
+
+ int val = buffer[readPointer] & 0xFF;
+ readPointer++;
+ if (readPointer == buffer.length)
+ {
+ buffer = read1();
+ readPointer = 0;
+ }
+
+ return val;
+ }
+
+ public byte[] read1() throws IOException
+ {
byte[] result = receiver.getBuffer();
if (result == null)
{
@@ -327,24 +361,7 @@
System.out.println("----total output: " + counter);
*/
-
- FileInputStream input = new
FileInputStream("/home/howard/tmp/jbm.log.1");
- FileOutputStream output = new
FileOutputStream("/home/howard/tmp/myzip.zip");
- GZipPipe pipe = new GZipPipe(input, 2048);
-
- byte[] buffer;
-
- buffer = pipe.read();
-
- while (buffer != null)
- {
- //System.out.println("buffer size: " + buffer.length);
- output.write(buffer);
- buffer = pipe.read();
- }
-
- output.close();
-
+ unzip();
/*
FileInputStream input = new
FileInputStream("/home/howard/tmp/jbm.log.1");
FileOutputStream output = new
FileOutputStream("/home/howard/tmp/output.zip");
@@ -366,5 +383,48 @@
System.out.println("done. time: " + (end - begin));
}
+
+ public static void zip() throws IOException
+ {
+ FileInputStream input = new
FileInputStream("/home/howard/tmp/jbm.log.1");
+ FileOutputStream output = new
FileOutputStream("/home/howard/tmp/myzip.zip");
+ GZipPipe pipe = new GZipPipe(input, 2048);
+
+ byte[] buffer = new byte[2048];
+
+ int n = pipe.read(buffer);
+
+ while (n != -1)
+ {
+ if (n > 0)
+ {
+ output.write(buffer, 0, n);
+ }
+ n = pipe.read(buffer);
+ }
+ output.close();
+ }
+
+ public static void unzip() throws IOException
+ {
+ FileInputStream input = new
FileInputStream("/home/howard/tmp/myzip.zip");
+ FileOutputStream output = new
FileOutputStream("/home/howard/tmp/myzip.out");
+
+ GZIPInputStream zipIn = new GZIPInputStream(input);
+
+ byte[] buffer = new byte[1024];
+
+ int n = zipIn.read(buffer);
+
+ while (n > 0)
+ {
+ //System.out.println("buffer size: " + buffer.length);
+ output.write(buffer, 0, n);
+ n = zipIn.read(buffer);
+ }
+
+ output.close();
+ }
+
}