Author: gaohoward
Date: 2010-11-16 11:34:05 -0500 (Tue, 16 Nov 2010)
New Revision: 9894
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
Log:
implement an output stream to which compressed bits are written but the result is
uncompressed.
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-11-16
15:27:14 UTC (rev 9893)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java 2010-11-16
16:34:05 UTC (rev 9894)
@@ -57,16 +57,13 @@
final LargeMessageBufferInternal bufferDelegate;
- final Executor threadPool;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public DecompressedLargeMessageBuffer(final LargeMessageBufferInternal bufferDelegate,
final Executor threadPool)
+ public DecompressedLargeMessageBuffer(final LargeMessageBufferInternal bufferDelegate,
Executor executor)
{
this.bufferDelegate = bufferDelegate;
- this.threadPool = threadPool;
}
@@ -101,21 +98,7 @@
public void setOutputStream(final OutputStream output) throws HornetQException
{
- try
- {
- PipedOutputStream pipeOut = new PipedOutputStream();
- PipedInputStream pipeIn = new PipedInputStream();
-
- pipeOut.connect(pipeIn);
-
- GZipUtil.pipeGZip(pipeIn, false, threadPool);
-
- bufferDelegate.setOutputStream(pipeOut);
- }
- catch (IOException e)
- {
- throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
e.getMessage(), e);
- }
+ bufferDelegate.setOutputStream(GZipUtil.createZipOutputStream(output));
}
public synchronized void saveBuffer(final OutputStream output) throws
HornetQException
Modified:
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
---
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-16
15:27:14 UTC (rev 9893)
+++
branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java 2010-11-16
16:34:05 UTC (rev 9894)
@@ -190,7 +190,114 @@
throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
e.getMessage(), e);
}
}
+
+ public static OutputStream createZipOutputStream(OutputStream out) throws
HornetQException
+ {
+ try
+ {
+ return new GZipOutput(out);
+ }
+ catch (IOException e)
+ {
+ throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY,
e.getMessage(), e);
+ }
+ }
+ public static class GZipOutput extends OutputStream
+ {
+ private OutputStream target;
+ private GZIPInputStream zipIn;
+ private DynamicInputStream receiver;
+
+ public GZipOutput(OutputStream out) throws IOException
+ {
+ target = out;
+ receiver = new DynamicInputStream(1024, 50);
+ }
+
+ public void write(int b) throws IOException
+ {
+ receiver.writeBuffer(b);
+ }
+
+ public void close() throws IOException
+ {
+ zipIn = new GZIPInputStream(receiver);
+ int b = zipIn.read();
+ int counter = 0;
+ while (b != -1)
+ {
+ target.write(b);
+ counter++;
+ b = zipIn.read();
+ }
+ target.close();
+ System.err.println(" total write: " + counter);
+ }
+
+ }
+
+ public static class DynamicInputStream extends InputStream
+ {
+ private List<byte[]> writeBuffer;
+ private int bufferSize;
+ private int counter, index;
+ private int readIndex, readCounter;
+
+ public DynamicInputStream(int size, int cache)
+ {
+ bufferSize = size;
+ writeBuffer = new ArrayList<byte[]>(cache);
+ for (int i = 0; i < cache; i++)
+ {
+ writeBuffer.add(new byte[size]);
+ }
+ counter = 0;
+ index = 0;
+ readIndex = 0;
+ readCounter = 0;
+ }
+
+ //read the buffer. If buffer is empty, return -1
+ public int read() throws IOException
+ {
+ int result = -1;
+
+ if (index > readIndex)
+ {
+ result = writeBuffer.get(readIndex)[readCounter++] & 0xFF;
+ if (readCounter == bufferSize)
+ {
+ readCounter = 0;
+ readIndex ++;
+ }
+ }
+ else if (index == readIndex)
+ {
+ if (counter > readCounter)
+ {
+ result = writeBuffer.get(readIndex)[readCounter++] & 0xFF;
+ }
+ }
+ return result;
+ }
+
+ public void writeBuffer(int b)
+ {
+ writeBuffer.get(index)[counter++] = (byte)b;
+ if (counter == bufferSize)
+ {
+ index++;
+ if (index == writeBuffer.size())
+ {
+ writeBuffer.add(new byte[bufferSize]);
+ }
+ counter = 0;
+ }
+ }
+
+ }
+
/*
* we keep a list of byte arrays. when writing, we start with the first array.
* when getBuffer() is called, the returned value is subject to the following rules:
Modified:
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
---
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-11-16
15:27:14 UTC (rev 9893)
+++
branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2010-11-16
16:34:05 UTC (rev 9894)
@@ -13,6 +13,9 @@
package org.hornetq.tests.integration.client;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.util.HashMap;
import javax.transaction.xa.XAResource;
@@ -2634,10 +2637,7 @@
for (int i = 0 ; i < messageSize; i++)
{
- //System.out.print(msg1.getBodyBuffer().readByte() + " ");
- //if (i % 100 == 0) System.out.println();
byte b = msg1.getBodyBuffer().readByte();
- //System.out.println("Byte read: " + (char)b + " i " +
i);
assertEquals("position = " + i, getSamplebyte(i), b);
}
@@ -2670,6 +2670,91 @@
}
}
+ public void testLargeMessageCompression2() throws Exception
+ {
+ final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(isNetty());
+ sf.setCompressLargeMessages(true);
+
+ session = sf.createSession(false, false, false);
+
+ session.createTemporaryQueue(LargeMessageTest.ADDRESS,
LargeMessageTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ String testDir = this.getTestDir();
+ File testFile = new File(testDir, "async_large_message");
+ FileOutputStream output = new FileOutputStream(testFile);
+
+ System.out.println("set out");
+
+ msg1.setOutputStream(output);
+
+ System.out.println("waiting...");
+ msg1.waitOutputStreamCompletion(0);
+
+ System.out.println("close output");
+
+ msg1.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ //verify
+ FileInputStream input = new FileInputStream(testFile);
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ byte b = (byte)input.read();
+ assertEquals("position = " + i, getSamplebyte(i), b);
+ }
+
+ testFile.delete();
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------