Author: gaohoward
Date: 2010-11-11 07:03:18 -0500 (Thu, 11 Nov 2010)
New Revision: 9873
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
a working compression prototype (non-thread)
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-11
02:23:43 UTC (rev 9872)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-11
12:03:18 UTC (rev 9873)
@@ -15,12 +15,19 @@
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -99,7 +106,7 @@
}
while ((size = input.read(readBytes)) > 0)
{
- System.out.println("Read " + size + " bytes on
compressing thread");
+// System.out.println("Read " + size + " bytes on
compressing thread");
out.write(readBytes, 0, size);
}
System.out.println("Finished compressing");
@@ -163,5 +170,212 @@
});
}
+ /*
+ * we keep a list of byte arrays. when writing, we start with the first array.
+ * when getBuffer() is called, the returned value is subject to the following rules:
+ *
+ * 1. if not closed, return the last full array. then update flags and pointers.
+ * 2. if closed, return all the remaining.
+ */
+ public static class DynamicOutputStream extends OutputStream
+ {
+ private List<byte[]> writeBuffer;
+ private int bufferSize;
+ private int counter, index;
+ private int readIndex, nextIndex;
+ private boolean closed;
+
+ public DynamicOutputStream(int size, int cache)
+ {
+ bufferSize = size;
+ writeBuffer = new ArrayList<byte[]>(cache);
+ for (int i = 0; i < cache; i++)
+ {
+ writeBuffer.add(new byte[size]);
+ }
+ counter = 0;
+ index = 0;
+ readIndex = 0;
+ nextIndex = 1;
+ closed = false;
+ }
+
+ public void write(int b) throws IOException
+ {
+ writeBuffer.get(index)[counter++] = (byte)b;
+ if (counter == bufferSize)
+ {
+ index = nextIndex;
+ nextIndex++;
+ if (index == writeBuffer.size())
+ {
+ writeBuffer.add(new byte[bufferSize]);
+ }
+ counter = 0;
+ }
+ }
+
+ public void close() throws IOException
+ {
+ closed = true;
+ }
+
+ /*
+ * logic:
+ * if index > readIndex, return readIndex, then readIndex++
+ * if index == readIndex, then return zero length byte[]. adjust nextIndex; if
closed, return the remaining.
+ * if index < readIndex, then return readIndex, readIndex = 0
+ *
+ * if closed and no more data, returns null.
+ */
+ public byte[] getBuffer()
+ {
+ byte[] result = new byte[0];
+ if (index > readIndex)
+ {
+ result = writeBuffer.get(readIndex);
+ readIndex++;
+ }
+ else if (index == readIndex)
+ {
+ if (closed)
+ {
+ if (counter == 0)
+ {
+ result = null;
+ }
+ else
+ {
+ result = new byte[counter];
+ System.arraycopy(writeBuffer.get(index), 0, result, 0, result.length);
+ counter = 0;
+ }
+ }
+ }
+ else if (index < readIndex)
+ {
+ result = writeBuffer.get(readIndex);
+ readIndex = 0;
+ }
+ return result;
+ }
+
+ }
+
+ public static class GZipPipe
+ {
+ private InputStream input;
+ private byte[] readBuffer;
+ private GZIPOutputStream zipOut;
+ private DynamicOutputStream receiver;
+
+ public GZipPipe(InputStream raw, int size) throws IOException
+ {
+ input = raw;
+ readBuffer = new byte[size];
+ receiver = new DynamicOutputStream(size, 50);
+ zipOut = new GZIPOutputStream(receiver);
+ }
+
+ public byte[] read() throws IOException
+ {
+ byte[] result = receiver.getBuffer();
+ if (result == null)
+ {
+ return null;
+ }
+ else if (result.length > 0)
+ {
+ return result;
+ }
+
+ int n = input.read(readBuffer);
+ while (true)
+ {
+ if (n > 0)
+ {
+ zipOut.write(readBuffer, 0, n);
+ result = receiver.getBuffer();
+ if ((result != null) && (result.length > 0))
+ {
+ break;
+ }
+ n = input.read(readBuffer);
+ }
+ else
+ {
+ zipOut.close();
+ result = receiver.getBuffer();
+ break;
+ }
+ }
+ return result;
+ }
+ }
+
+ public static void main(String[] args) throws HornetQException, IOException
+ {
+ long begin = System.currentTimeMillis();
+/*
+ FileInputStream input = new
FileInputStream("/home/howard/tmp/jbm.log.1");
+ FileOutputStream output = new
FileOutputStream("/home/howard/tmp/output3.zip");
+ GZIPOutputStream zipOut = new GZIPOutputStream(output);
+
+ byte[] buffer = new byte[1024];
+
+ int n = input.read(buffer);
+
+ int counter = 0;
+
+ while (n > 0)
+ {
+ zipOut.write(buffer, 0, n);
+ counter += n;
+ n = input.read(buffer);
+ }
+ zipOut.close();
+
+ System.out.println("----total output: " + counter);
+*/
+
+ FileInputStream input = new
FileInputStream("/home/howard/tmp/jbm.log.1");
+ FileOutputStream output = new
FileOutputStream("/home/howard/tmp/myzip.zip");
+ GZipPipe pipe = new GZipPipe(input, 2048);
+
+ byte[] buffer;
+
+ buffer = pipe.read();
+
+ while (buffer != null)
+ {
+ //System.out.println("buffer size: " + buffer.length);
+ output.write(buffer);
+ buffer = pipe.read();
+ }
+
+ output.close();
+
+/*
+ FileInputStream input = new
FileInputStream("/home/howard/tmp/jbm.log.1");
+ FileOutputStream output = new
FileOutputStream("/home/howard/tmp/output.zip");
+ ExecutorService service = Executors.newCachedThreadPool();
+ InputStream result = GZipUtil.pipeGZip(input, true, service);
+
+ byte[] buffer = new byte[2048];
+ int n = result.read(buffer);
+ System.out.println("got first data");
+
+ while (n > 0)
+ {
+ output.write(buffer);
+ n = result.read(buffer);
+ }
+*/
+ long end = System.currentTimeMillis();
+
+
+ System.out.println("done. time: " + (end - begin));
+ }
+
}
Modified:
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
---
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-11-11
02:23:43 UTC (rev 9872)
+++
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2010-11-11
12:03:18 UTC (rev 9873)
@@ -2684,6 +2684,75 @@
}
}
+ public void testLargeMessageCompression() throws Exception
+ {
+ final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ msg1.acknowledge();
+ session.commit();
+ Assert.assertNotNull(msg1);
+
+ consumer.close();
+
+ try
+ {
+ msg1.getBodyBuffer().readByte();
+ Assert.fail("Exception was expected");
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------