[hornetq-commits] JBoss hornetq SVN: r9873 - in branches/Branch_Large_Message_Compression: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 11 07:03:18 EST 2010


Author: gaohoward
Date: 2010-11-11 07:03:18 -0500 (Thu, 11 Nov 2010)
New Revision: 9873

Modified:
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
a working compression prototype (non-thread)


Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java	2010-11-11 02:23:43 UTC (rev 9872)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java	2010-11-11 12:03:18 UTC (rev 9873)
@@ -15,12 +15,19 @@
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -99,7 +106,7 @@
                }
                while ((size = input.read(readBytes)) > 0)
                {
-                  System.out.println("Read " + size + " bytes on compressing thread");
+//                  System.out.println("Read " + size + " bytes on compressing thread");
                   out.write(readBytes, 0, size);
                }
                System.out.println("Finished compressing");
@@ -163,5 +170,212 @@
       });
    }
 
+   /*
+    * we keep a list of byte arrays. when writing, we start with the first array.
+    * when getBuffer() is called, the returned value is subject to the following rules:
+    * 
+    * 1. if not closed, return the last full array. then update flags and pointers.
+    * 2. if closed, return all the remaining.
+    */
+   public static class DynamicOutputStream extends OutputStream
+   {
 
+      private List<byte[]> writeBuffer;
+      private int bufferSize;
+      private int counter, index;
+      private int readIndex, nextIndex;
+      private boolean closed;
+      
+      public DynamicOutputStream(int size, int cache)
+      {
+         bufferSize = size;
+         writeBuffer = new ArrayList<byte[]>(cache);
+         for (int i = 0; i < cache; i++)
+         {
+            writeBuffer.add(new byte[size]);
+         }
+         counter = 0;
+         index = 0;
+         readIndex = 0;
+         nextIndex = 1;
+         closed = false;
+      }
+
+      public void write(int b) throws IOException
+      {
+         writeBuffer.get(index)[counter++] = (byte)b;
+         if (counter == bufferSize)
+         {
+            index = nextIndex;
+            nextIndex++;
+            if (index == writeBuffer.size())
+            {
+               writeBuffer.add(new byte[bufferSize]);
+            }
+            counter = 0;
+         }
+      }
+      
+      public void close() throws IOException
+      {
+         closed = true;
+      }
+
+      /*
+       * logic: 
+       * if index > readIndex, return readIndex, then readIndex++
+       * if index == readIndex, then return zero length byte[]. adjust nextIndex; if closed, return the remaining.
+       * if index < readIndex, then return readIndex, readIndex = 0
+       * 
+       * if closed and no more data, returns null.
+       */
+      public byte[] getBuffer()
+      {
+         byte[] result = new byte[0];
+         if (index > readIndex)
+         {
+            result = writeBuffer.get(readIndex);
+            readIndex++;
+         }
+         else if (index == readIndex)
+         {
+            if (closed)
+            {
+               if (counter == 0)
+               {
+                  result = null;
+               }
+               else
+               {
+                  result = new byte[counter];
+                  System.arraycopy(writeBuffer.get(index), 0, result, 0, result.length);
+                  counter = 0;
+               }
+            }
+         }
+         else if (index < readIndex)
+         {
+            result = writeBuffer.get(readIndex);
+            readIndex = 0;
+         }
+         return result;
+      }
+
+   }
+   
+   public static class GZipPipe
+   {
+      private InputStream input;
+      private byte[] readBuffer;
+      private GZIPOutputStream zipOut;
+      private DynamicOutputStream receiver;
+      
+      public GZipPipe(InputStream raw, int size) throws IOException
+      {
+         input = raw;
+         readBuffer = new byte[size];
+         receiver = new DynamicOutputStream(size, 50);
+         zipOut = new GZIPOutputStream(receiver);
+      }
+      
+      public byte[] read() throws IOException
+      {
+         byte[] result = receiver.getBuffer();
+         if (result == null)
+         {
+            return null;
+         }
+         else if (result.length > 0)
+         {
+            return result;
+         }
+         
+         int n = input.read(readBuffer);
+         while (true)
+         {
+            if (n > 0)
+            {
+               zipOut.write(readBuffer, 0, n);
+               result = receiver.getBuffer();
+               if ((result != null) && (result.length > 0))
+               {
+                  break;
+               }
+               n = input.read(readBuffer);
+            }
+            else
+            {
+               zipOut.close();
+               result = receiver.getBuffer();
+               break;
+            }
+         }
+         return result;
+      }
+   }
+
+   public static void main(String[] args) throws HornetQException, IOException
+   {
+      long begin = System.currentTimeMillis();
+/*
+      FileInputStream input = new FileInputStream("/home/howard/tmp/jbm.log.1");
+      FileOutputStream output = new FileOutputStream("/home/howard/tmp/output3.zip");
+      GZIPOutputStream zipOut = new GZIPOutputStream(output);
+      
+      byte[] buffer = new byte[1024];
+      
+      int n = input.read(buffer);
+      
+      int counter = 0;
+      
+      while (n > 0)
+      {
+         zipOut.write(buffer, 0, n);
+         counter += n;
+         n = input.read(buffer);
+      }
+      zipOut.close();
+      
+      System.out.println("----total output: " + counter);
+*/
+
+      FileInputStream input = new FileInputStream("/home/howard/tmp/jbm.log.1");
+      FileOutputStream output = new FileOutputStream("/home/howard/tmp/myzip.zip");
+      GZipPipe pipe = new GZipPipe(input, 2048);
+      
+      byte[] buffer;
+      
+      buffer = pipe.read();
+      
+      while (buffer != null)
+      {
+         //System.out.println("buffer size: " + buffer.length);
+         output.write(buffer);
+         buffer = pipe.read();
+      }
+
+      output.close();
+
+/*
+      FileInputStream input = new FileInputStream("/home/howard/tmp/jbm.log.1");
+      FileOutputStream output = new FileOutputStream("/home/howard/tmp/output.zip");
+      ExecutorService service = Executors.newCachedThreadPool();
+      InputStream result = GZipUtil.pipeGZip(input, true, service);
+      
+      byte[] buffer = new byte[2048];
+      int n = result.read(buffer);
+      System.out.println("got first data");
+      
+      while (n > 0)
+      {
+         output.write(buffer);
+         n = result.read(buffer);
+      }
+*/
+      long end = System.currentTimeMillis();
+      
+      
+      System.out.println("done. time: " + (end - begin));
+   }
+
 }

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-11-11 02:23:43 UTC (rev 9872)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-11-11 12:03:18 UTC (rev 9873)
@@ -2684,6 +2684,75 @@
       }
    }
 
+   public void testLargeMessageCompression() throws Exception
+   {
+      final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true, isNetty());
+
+         server.start();
+
+         ClientSessionFactory sf = createFactory(isNetty());
+
+         session = sf.createSession(false, false, false);
+
+         session.createTemporaryQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+         producer.send(clientFile);
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+         ClientMessage msg1 = consumer.receive(1000);
+         msg1.acknowledge();
+         session.commit();
+         Assert.assertNotNull(msg1);
+
+         consumer.close();
+
+         try
+         {
+            msg1.getBodyBuffer().readByte();
+            Assert.fail("Exception was expected");
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         session.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list