[hornetq-commits] JBoss hornetq SVN: r9875 - in branches/Branch_Large_Message_Compression/src/main/org/hornetq: utils and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 11 09:44:07 EST 2010


Author: gaohoward
Date: 2010-11-11 09:44:07 -0500 (Thu, 11 Nov 2010)
New Revision: 9875

Modified:
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
Log:
Let the GZipPipe util class extend InputStream


Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-11-11 12:32:35 UTC (rev 9874)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-11-11 14:44:07 UTC (rev 9875)
@@ -411,7 +411,8 @@
 
       if (session.isCompressLargeMessages())
       {
-         input = GZipUtil.pipeGZip(inputStreamParameter, true, session.getThreadPool());
+         //input = GZipUtil.pipeGZip(inputStreamParameter, true, session.getThreadPool());
+         input = GZipUtil.createZipInputStream(inputStreamParameter);
       }
 
       while (!lastPacket)

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java	2010-11-11 12:32:35 UTC (rev 9874)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java	2010-11-11 14:44:07 UTC (rev 9875)
@@ -16,6 +16,7 @@
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -166,6 +167,18 @@
          }
       });
    }
+   
+   public static InputStream createZipInputStream(InputStream input) throws HornetQException
+   {
+      try
+      {
+         return new GZipPipe(input, 1024);
+      }
+      catch (IOException e)
+      {
+         throw new HornetQException(HornetQException.LARGE_MESSAGE_ERROR_BODY, e.getMessage(), e);
+      }
+   }
 
    /*
     * we keep a list of byte arrays. when writing, we start with the first array.
@@ -249,15 +262,16 @@
          }
          return result;
       }
-
    }
    
-   public static class GZipPipe
+   public static class GZipPipe extends InputStream
    {
       private InputStream input;
       private byte[] readBuffer;
       private GZIPOutputStream zipOut;
       private DynamicOutputStream receiver;
+      private int readPointer;
+      private byte[] buffer;
       
       public GZipPipe(InputStream raw, int size) throws IOException
       {
@@ -265,10 +279,30 @@
          readBuffer = new byte[size];
          receiver = new DynamicOutputStream(size, 50);
          zipOut = new GZIPOutputStream(receiver);
+         readPointer = 0;
+         buffer = read1();
       }
       
-      public byte[] read() throws IOException
+      public int read() throws IOException
       {
+         if (buffer == null)
+         {
+            return -1;
+         }
+         
+         int val = buffer[readPointer] & 0xFF;
+         readPointer++;
+         if (readPointer == buffer.length)
+         {
+            buffer = read1();
+            readPointer = 0;
+         }
+
+         return val;
+      }
+      
+      public byte[] read1() throws IOException
+      {
          byte[] result = receiver.getBuffer();
          if (result == null)
          {
@@ -327,24 +361,7 @@
       
       System.out.println("----total output: " + counter);
 */
-
-      FileInputStream input = new FileInputStream("/home/howard/tmp/jbm.log.1");
-      FileOutputStream output = new FileOutputStream("/home/howard/tmp/myzip.zip");
-      GZipPipe pipe = new GZipPipe(input, 2048);
-      
-      byte[] buffer;
-      
-      buffer = pipe.read();
-      
-      while (buffer != null)
-      {
-         //System.out.println("buffer size: " + buffer.length);
-         output.write(buffer);
-         buffer = pipe.read();
-      }
-
-      output.close();
-
+      unzip();
 /*
       FileInputStream input = new FileInputStream("/home/howard/tmp/jbm.log.1");
       FileOutputStream output = new FileOutputStream("/home/howard/tmp/output.zip");
@@ -366,5 +383,48 @@
       
       System.out.println("done. time: " + (end - begin));
    }
+   
+   public static void zip() throws IOException
+   {
+      FileInputStream input = new FileInputStream("/home/howard/tmp/jbm.log.1");
+      FileOutputStream output = new FileOutputStream("/home/howard/tmp/myzip.zip");
+      GZipPipe pipe = new GZipPipe(input, 2048);
+      
+      byte[] buffer = new byte[2048];
+      
+      int n = pipe.read(buffer);
+      
+      while (n != -1)
+      {
+         if (n > 0)
+         {
+            output.write(buffer, 0, n);
+         }
+         n = pipe.read(buffer);
+      }
 
+      output.close();
+   }
+
+   public static void unzip() throws IOException
+   {
+      FileInputStream input = new FileInputStream("/home/howard/tmp/myzip.zip");
+      FileOutputStream output = new FileOutputStream("/home/howard/tmp/myzip.out");
+
+      GZIPInputStream zipIn = new GZIPInputStream(input);
+      
+      byte[] buffer = new byte[1024];
+      
+      int n = zipIn.read(buffer);
+      
+      while (n > 0)
+      {
+         //System.out.println("buffer size: " + buffer.length);
+         output.write(buffer, 0, n);
+         n = zipIn.read(buffer);
+      }
+
+      output.close();
+   }
+
 }



More information about the hornetq-commits mailing list