[hornetq-commits] JBoss hornetq SVN: r9894 - in branches/Branch_Large_Message_Compression: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 16 11:34:05 EST 2010


Author: gaohoward
Date: 2010-11-16 11:34:05 -0500 (Tue, 16 Nov 2010)
New Revision: 9894

Modified:
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
Log:
implement an output stream to which compressed bits are written but the result is uncompressed.



Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java	2010-11-16 15:27:14 UTC (rev 9893)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java	2010-11-16 16:34:05 UTC (rev 9894)
@@ -57,16 +57,13 @@
 
    final LargeMessageBufferInternal bufferDelegate;
    
-   final Executor threadPool;
-   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public DecompressedLargeMessageBuffer(final LargeMessageBufferInternal bufferDelegate, final Executor threadPool)
+   public DecompressedLargeMessageBuffer(final LargeMessageBufferInternal bufferDelegate, Executor executor)
    {
       this.bufferDelegate = bufferDelegate;
-      this.threadPool = threadPool;
    }
 
 
@@ -101,21 +98,7 @@
 
    public void setOutputStream(final OutputStream output) throws HornetQException
    {
-      try
-      {
-         PipedOutputStream pipeOut = new PipedOutputStream();
-         PipedInputStream pipeIn = new PipedInputStream();
-         
-         pipeOut.connect(pipeIn);
-         
-         GZipUtil.pipeGZip(pipeIn, false, threadPool);
-         
-         bufferDelegate.setOutputStream(pipeOut);
-      }
-      catch (IOException e)
-      {
-         throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
-      }
+      bufferDelegate.setOutputStream(GZipUtil.createZipOutputStream(output));
    }
 
    public synchronized void saveBuffer(final OutputStream output) throws HornetQException

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java	2010-11-16 15:27:14 UTC (rev 9893)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java	2010-11-16 16:34:05 UTC (rev 9894)
@@ -190,7 +190,114 @@
          throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
       }
    }
+   
+   public static OutputStream createZipOutputStream(OutputStream out) throws HornetQException
+   {
+      try
+      {
+         return new GZipOutput(out);
+      }
+      catch (IOException e)
+      {
+         throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+      }      
+   }
 
+   public static class GZipOutput extends OutputStream
+   {
+      private OutputStream target;
+      private GZIPInputStream zipIn;
+      private DynamicInputStream receiver;
+
+      public GZipOutput(OutputStream out) throws IOException
+      {
+         target = out;
+         receiver = new DynamicInputStream(1024, 50);
+      }
+
+      public void write(int b) throws IOException
+      {
+         receiver.writeBuffer(b);
+      }
+      
+      public void close() throws IOException
+      {
+         zipIn = new GZIPInputStream(receiver);
+         int b = zipIn.read();
+         int counter = 0;
+         while (b != -1)
+         {
+            target.write(b);
+            counter++;
+            b = zipIn.read();
+         }
+         target.close();
+         System.err.println(" total write: " + counter);
+      }
+      
+   }
+   
+   public static class DynamicInputStream extends InputStream
+   {
+      private List<byte[]> writeBuffer;
+      private int bufferSize;
+      private int counter, index;
+      private int readIndex, readCounter;
+      
+      public DynamicInputStream(int size, int cache)
+      {
+         bufferSize = size;
+         writeBuffer = new ArrayList<byte[]>(cache);
+         for (int i = 0; i < cache; i++)
+         {
+            writeBuffer.add(new byte[size]);
+         }
+         counter = 0;
+         index = 0;
+         readIndex = 0;
+         readCounter = 0;
+      }
+
+      //read the buffer. If buffer is empty, return -1
+      public int read() throws IOException
+      {
+         int result = -1;
+         
+         if (index > readIndex)
+         {
+            result = writeBuffer.get(readIndex)[readCounter++] & 0xFF;
+            if (readCounter == bufferSize)
+            {
+               readCounter = 0;
+               readIndex ++;
+            }
+         }
+         else if (index == readIndex)
+         {
+            if (counter > readCounter)
+            {
+               result = writeBuffer.get(readIndex)[readCounter++] & 0xFF;
+            }
+         }
+         return result;
+      }
+      
+      public void writeBuffer(int b)
+      {
+         writeBuffer.get(index)[counter++] = (byte)b;
+         if (counter == bufferSize)
+         {
+            index++;
+            if (index == writeBuffer.size())
+            {
+               writeBuffer.add(new byte[bufferSize]);
+            }
+            counter = 0;
+         }
+      }
+      
+   }
+   
    /*
     * we keep a list of byte arrays. when writing, we start with the first array.
     * when getBuffer() is called, the returned value is subject to the following rules:

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java	2010-11-16 15:27:14 UTC (rev 9893)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java	2010-11-16 16:34:05 UTC (rev 9894)
@@ -13,6 +13,9 @@
 
 package org.hornetq.tests.integration.client;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.util.HashMap;
 
 import javax.transaction.xa.XAResource;
@@ -2634,10 +2637,7 @@
          
          for (int i = 0 ; i < messageSize; i++)
          {
-            //System.out.print(msg1.getBodyBuffer().readByte() + "  ");
-            //if (i % 100 == 0) System.out.println();
             byte b = msg1.getBodyBuffer().readByte();
-            //System.out.println("Byte read: " + (char)b + " i " + i);
             assertEquals("position = "  + i, getSamplebyte(i), b);
          }
 
@@ -2670,6 +2670,91 @@
       }
    }
 
+   public void testLargeMessageCompression2() throws Exception
+   {
+      final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true, isNetty());
+
+         server.start();
+
+         ClientSessionFactory sf = createFactory(isNetty());
+         sf.setCompressLargeMessages(true);
+
+         session = sf.createSession(false, false, false);
+
+         session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+         producer.send(clientFile);
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+         ClientMessage msg1 = consumer.receive(1000);
+         Assert.assertNotNull(msg1);
+         
+         String testDir = this.getTestDir();
+         File testFile = new File(testDir, "async_large_message");
+         FileOutputStream output = new FileOutputStream(testFile);
+         
+         System.out.println("set out");
+         
+         msg1.setOutputStream(output);
+         
+         System.out.println("waiting...");
+         msg1.waitOutputStreamCompletion(0);
+         
+         System.out.println("close output");
+         
+         msg1.acknowledge();
+
+         session.commit();
+
+         consumer.close();
+
+         session.close();
+
+         //verify
+         FileInputStream input = new FileInputStream(testFile);
+         for (int i = 0 ; i < messageSize; i++)
+         {
+            byte b = (byte)input.read();
+            assertEquals("position = "  + i, getSamplebyte(i), b);
+         }
+         
+         testFile.delete();
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list