[hornetq-commits] JBoss hornetq SVN: r9952 - in branches/Branch_Large_Message_Compression/src/main/org/hornetq: utils and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 30 10:39:23 EST 2010


Author: gaohoward
Date: 2010-11-30 10:39:22 -0500 (Tue, 30 Nov 2010)
New Revision: 9952

Modified:
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java
Log:
replace old impl


Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-11-30 14:30:46 UTC (rev 9951)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-11-30 15:39:22 UTC (rev 9952)
@@ -13,16 +13,8 @@
 
 package org.hornetq.core.client.impl;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.concurrent.Executor;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
@@ -30,13 +22,12 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
-import org.hornetq.utils.GZipUtil;
+import org.hornetq.utils.DeflaterReader;
 import org.hornetq.utils.HornetQBufferInputStream;
 import org.hornetq.utils.TokenBucketLimiter;
 import org.hornetq.utils.UUIDGenerator;
@@ -411,8 +402,7 @@
 
       if (session.isCompressLargeMessages())
       {
-         //input = GZipUtil.pipeGZip(inputStreamParameter, true, session.getThreadPool());
-         input = GZipUtil.createZipInputStream(inputStreamParameter);
+         input = new DeflaterReader(inputStreamParameter);
       }
 
       while (!lastPacket)

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2010-11-30 14:30:46 UTC (rev 9951)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2010-11-30 15:39:22 UTC (rev 9952)
@@ -1257,6 +1257,7 @@
    {
       try
       {
+         System.err.println("___writing packet: " + packet + " output " + output);
          output.write(packet.getBody());
          if (!packet.isContinues())
          {

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java	2010-11-30 14:30:46 UTC (rev 9951)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java	2010-11-30 15:39:22 UTC (rev 9952)
@@ -98,7 +98,7 @@
 
    public void setOutputStream(final OutputStream output) throws HornetQException
    {
-      bufferDelegate.setOutputStream(GZipUtil.createZipOutputStream(output));
+      bufferDelegate.setOutputStream(new InflaterWriter(output));
    }
 
    public synchronized void saveBuffer(final OutputStream output) throws HornetQException
@@ -145,7 +145,7 @@
          {
             InputStream input = new HornetQBufferInputStream(bufferDelegate);
             
-            dataInput = new DataInputStream(GZipUtil.createUnZipInputStream(input));
+            dataInput = new DataInputStream(new InflaterReader(input));
          }
          catch (Exception e)
          {

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java	2010-11-30 14:30:46 UTC (rev 9951)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java	2010-11-30 15:39:22 UTC (rev 9952)
@@ -16,6 +16,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.zip.Deflater;
 
 /**
@@ -26,7 +27,7 @@
  * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
  *
  */
-public class DeflaterReader
+public class DeflaterReader extends InputStream
 {
    private Deflater deflater = new Deflater();
    private boolean isFinished = false;
@@ -38,12 +39,22 @@
    {
       input = inData;
    }
-   
-   public int read(byte[] buffer) throws IOException
+
+   public int read() throws IOException
    {
-      return read(buffer, 0, buffer.length);
+      byte[] buffer = new byte[1];
+      int n = read(buffer, 0, 1);
+      if (n == 1)
+      {
+         return buffer[0];
+      }
+      if (n == -1)
+      {
+         return -1;
+      }
+      throw new IOException("Error reading data, invalid n: " + n);
    }
-   
+
    /**
     * Try to fill the buffer with compressed bytes. Except the last effective read,
     * this method always returns with a full buffer of compressed data.
@@ -52,6 +63,7 @@
     * @return the number of bytes really filled, -1 indicates end.
     * @throws IOException 
     */
+   @Override
    public int read(byte[] buffer, int offset, int len) throws IOException
    {
       if (compressDone)
@@ -74,6 +86,7 @@
             {
                deflater.end();
                compressDone = true;
+               read = -1;
                break;
             }
             else if (deflater.needsInput())
@@ -120,6 +133,25 @@
       
       DeflaterReader reader = new DeflaterReader(inputStream);
       
+
+      ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+      int b = reader.read();
+      
+      while (b != -1)
+      {
+         zipHolder.add(b);
+         b = reader.read();
+      }
+      
+      byte[] allCompressed = new byte[zipHolder.size()];
+      for (int i = 0; i < allCompressed.length; i++)
+      {
+         allCompressed[i] = (byte) zipHolder.get(i).intValue();
+      }
+      
+      System.err.println("compressed: " + getBytesString(allCompressed));
+
+/*
       byte[] buffer = new byte[7];
       
       int n = reader.read(buffer);
@@ -131,7 +163,7 @@
          System.err.println("==>read n " + n + " values: " + getBytesString(buffer));
          n = reader.read(buffer);
       }
-      
+*/
       System.err.println("compressed.");
       
       System.err.println("now verify");

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java	2010-11-30 14:30:46 UTC (rev 9951)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java	2010-11-30 15:39:22 UTC (rev 9952)
@@ -47,6 +47,11 @@
    {
       this.output = output;
    }
+   
+   private void log(String str)
+   {
+      System.err.println(this + " " + str);
+   }
 
    /*
     * Write a compressed byte.
@@ -54,6 +59,7 @@
    @Override
    public void write(int b) throws IOException
    {
+      log("call write b: " + b);
       writeBuffer[writePointer] = (byte)(b & 0xFF);
       writePointer++;
       
@@ -62,6 +68,7 @@
          writePointer = 0;
          try
          {
+            log("call doWrite");
             doWrite();
          }
          catch (DataFormatException e)
@@ -74,6 +81,7 @@
    @Override
    public void close() throws IOException
    {
+      log("call close");
       if (writePointer > 0)
       {
          inflater.setInput(writeBuffer, 0, writePointer);



More information about the hornetq-commits mailing list