[hornetq-commits] JBoss hornetq SVN: r9960 - in branches/Branch_Large_Message_Compression: src/main/org/hornetq/utils and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 1 09:25:43 EST 2010


Author: gaohoward
Date: 2010-12-01 09:25:42 -0500 (Wed, 01 Dec 2010)
New Revision: 9960

Added:
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
Modified:
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java
   branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
   branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
cleanup and tests



Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2010-12-01 12:23:36 UTC (rev 9959)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2010-12-01 14:25:42 UTC (rev 9960)
@@ -1257,7 +1257,6 @@
    {
       try
       {
-         System.err.println("___writing packet: " + packet + " output " + output);
          output.write(packet.getBody());
          if (!packet.isContinues())
          {

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java	2010-12-01 12:23:36 UTC (rev 9959)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DecompressedLargeMessageBuffer.java	2010-12-01 14:25:42 UTC (rev 9960)
@@ -17,8 +17,6 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
@@ -28,7 +26,6 @@
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.client.impl.LargeMessageBufferImpl;
 import org.hornetq.core.client.impl.LargeMessageBufferInternal;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -45,18 +42,15 @@
 
    // Constants -----------------------------------------------------
 
-   /**
-    * 
-    */
    private static final String OPERATION_NOT_SUPPORTED = "Operation not supported";
 
    private static final String READ_ONLY_ERROR_MESSAGE = "This is a read-only buffer, setOperations are not supported";
 
    // Attributes ----------------------------------------------------
 
-
    final LargeMessageBufferInternal bufferDelegate;
    
+   private long readerIndex = 0;
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -411,7 +405,6 @@
 
    public int readerIndex()
    {
-      // TODO
       return 0;
    }
 

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java	2010-12-01 12:23:36 UTC (rev 9959)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/DeflaterReader.java	2010-12-01 14:25:42 UTC (rev 9960)
@@ -13,10 +13,8 @@
 
 package org.hornetq.utils;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
 import java.util.zip.Deflater;
 
 /**
@@ -46,9 +44,9 @@
       int n = read(buffer, 0, 1);
       if (n == 1)
       {
-         return buffer[0];
+         return (int)buffer[0] & 0xFF;
       }
-      if (n == -1)
+      if (n == -1 || n == 0)
       {
          return -1;
       }
@@ -86,19 +84,15 @@
             {
                deflater.end();
                compressDone = true;
-               read = -1;
                break;
             }
             else if (deflater.needsInput())
             {
-               System.err.println("need input so read input");
                // read some data from inputstream
                int m = input.read(readBuffer);
-               System.err.println("original data read: " + m);
+               
                if (m == -1)
                {
-                  System.err.println("no more original data, finish deflater, now offset " + offset + " len " + len);
-                  
                   deflater.finish();
                   isFinished = true;
                }
@@ -119,74 +113,8 @@
             offset += n;
             len -= n;
          }
-         
       }
       return read;
    }
    
-   public static void main(String[] args) throws IOException
-   {
-      String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
-      byte[] input = inputString.getBytes("UTF-8");
-
-      ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
-      
-      DeflaterReader reader = new DeflaterReader(inputStream);
-      
-
-      ArrayList<Integer> zipHolder = new ArrayList<Integer>();
-      int b = reader.read();
-      
-      while (b != -1)
-      {
-         zipHolder.add(b);
-         b = reader.read();
-      }
-      
-      byte[] allCompressed = new byte[zipHolder.size()];
-      for (int i = 0; i < allCompressed.length; i++)
-      {
-         allCompressed[i] = (byte) zipHolder.get(i).intValue();
-      }
-      
-      System.err.println("compressed: " + getBytesString(allCompressed));
-
-/*
-      byte[] buffer = new byte[7];
-      
-      int n = reader.read(buffer);
-      
-      System.err.println("first read: " + n);
-      
-      while (n != -1)
-      {
-         System.err.println("==>read n " + n + " values: " + getBytesString(buffer));
-         n = reader.read(buffer);
-      }
-*/
-      System.err.println("compressed.");
-      
-      System.err.println("now verify");
-      
-      byte[] output = new byte[30];
-      Deflater compresser = new Deflater();
-      compresser.setInput(input);
-      compresser.finish();
-      int compressedDataLength = compresser.deflate(output);
-      System.err.println("compress len: " + compressedDataLength);
-      System.err.println("commpress data: " + getBytesString(output));
-
-   }
-   
-   static String getBytesString(byte[] array)
-   {
-      StringBuffer bf = new StringBuffer();
-      for (byte b : array)
-      {
-         int val = b & 0xFF;
-         bf.append(val + " ");
-      }
-      return bf.toString();
-   }
-   
 }

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java	2010-12-01 12:23:36 UTC (rev 9959)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/GZipUtil.java	2010-12-01 14:25:42 UTC (rev 9960)
@@ -34,6 +34,7 @@
  */
 public class GZipUtil
 {
+   
    public static InputStream createZipInputStream(InputStream input) throws HornetQException
    {
       try

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java	2010-12-01 12:23:36 UTC (rev 9959)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterReader.java	2010-12-01 14:25:42 UTC (rev 9960)
@@ -13,12 +13,9 @@
 
 package org.hornetq.utils;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
 import java.util.zip.DataFormatException;
-import java.util.zip.Deflater;
 import java.util.zip.Inflater;
 
 /**
@@ -50,30 +47,17 @@
       this.pointer = -1;
    }
    
-   public static void log(String str)
-   {
-      System.out.println(str);
-   }
-   
    public int read() throws IOException
    {
-      log("in read");
-      
       if (pointer == -1)
       {
-         log("pointer is -1");
-         
          try
          {
-            log("need to decompress more bytes");
             length = doRead(readBuffer, 0, readBuffer.length);
-            log("bytes decompressed:" + length);
             if (length == 0)
             {
-               log("zero byte got, ending");
                return -1;
             }
-            log("reset pointer to zero");
             pointer = 0;
          }
          catch (DataFormatException e)
@@ -82,16 +66,13 @@
          }
       }
       
-      log("reading byte at " + pointer);
       int value = readBuffer[pointer] & 0xFF;
       pointer++;
       if (pointer == length)
       {
-         log("buffer all read, set pointer to -1");
          pointer = -1;
       }
       
-      log("byte got: " + value);
       return value;
    }
    
@@ -153,44 +134,5 @@
       }
       return read;
    }
-   
-   public static void main(String[] args) throws IOException
-   {
-      String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
-      byte[] input = inputString.getBytes("UTF-8");
-      byte[] output = new byte[30];
-      Deflater compresser = new Deflater();
-      compresser.setInput(input);
-      compresser.finish();
-      int compressedDataLength = compresser.deflate(output);
-      System.err.println("compress len: " + compressedDataLength);
 
-      byte[] zipBytes = new byte[compressedDataLength];
-      
-      System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
-      ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
-      
-      InflaterReader inflater = new InflaterReader(byteInput);
-      ArrayList<Integer> holder = new ArrayList<Integer>();
-      int read = inflater.read();
-      
-      while (read != -1)
-      {
-         holder.add(read);
-         read = inflater.read();
-      }
-      
-      byte[] result = new byte[holder.size()];
-      
-      System.out.println("total bytes: " + holder.size());
-      for (int i = 0; i < result.length; i++)
-      {
-         result[i] = holder.get(i).byteValue();
-      }
-      
-      String txt = new String(result);
-      System.out.println("the result: " + txt);
-      
-   }
-
 }

Modified: branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java
===================================================================
--- branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java	2010-12-01 12:23:36 UTC (rev 9959)
+++ branches/Branch_Large_Message_Compression/src/main/org/hornetq/utils/InflaterWriter.java	2010-12-01 14:25:42 UTC (rev 9960)
@@ -47,11 +47,6 @@
    {
       this.output = output;
    }
-   
-   private void log(String str)
-   {
-      System.err.println(this + " " + str);
-   }
 
    /*
     * Write a compressed byte.
@@ -59,7 +54,6 @@
    @Override
    public void write(int b) throws IOException
    {
-      log("call write b: " + b);
       writeBuffer[writePointer] = (byte)(b & 0xFF);
       writePointer++;
       
@@ -68,7 +62,6 @@
          writePointer = 0;
          try
          {
-            log("call doWrite");
             doWrite();
          }
          catch (DataFormatException e)
@@ -81,7 +74,6 @@
    @Override
    public void close() throws IOException
    {
-      log("call close");
       if (writePointer > 0)
       {
          inflater.setInput(writeBuffer, 0, writePointer);
@@ -113,43 +105,5 @@
          n = inflater.inflate(outputBuffer);
       }
    }
-   
-   public static void main(String[] args) throws IOException
-   {
-      String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
-      byte[] input = inputString.getBytes("UTF-8");
-      byte[] output = new byte[30];
-      Deflater compresser = new Deflater();
-      compresser.setInput(input);
-      compresser.finish();
-      int compressedDataLength = compresser.deflate(output);
-      System.err.println("compress len: " + compressedDataLength);
 
-      byte[] zipBytes = new byte[compressedDataLength];
-      
-      System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
-      ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
-      
-      ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
-      InflaterWriter writer = new InflaterWriter(byteOutput);
-      
-      byte[] zipBuffer = new byte[12];
-      
-      int n = byteInput.read(zipBuffer);
-      while (n > 0)
-      {
-         System.out.println("Writing: " + n);
-         writer.write(zipBuffer, 0, n);
-         n = byteInput.read(zipBuffer);
-      }
-
-      writer.close();
-      
-      byte[] outcome = byteOutput.toByteArray();
-      String outStr = new String(outcome);
-      
-      System.out.println("Outcome: " + outStr);
-      
-   }
-
 }

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java	2010-12-01 12:23:36 UTC (rev 9959)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java	2010-12-01 14:25:42 UTC (rev 9960)
@@ -25,12 +25,15 @@
 
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.client.impl.ClientConsumerInternal;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.settings.impl.AddressSettings;
@@ -736,209 +739,6 @@
       }
    }
 
-   public void testResendSmallStreamMessage() throws Exception
-   {
-      internalTestResendMessage(50000);
-   }
-
-   public void testResendLargeStreamMessage() throws Exception
-   {
-      internalTestResendMessage(150 * 1024);
-   }
-
-   public void internalTestResendMessage(final long messageSize) throws Exception
-   {
-      ClientSession session = null;
-
-      try
-      {
-         server = createServer(true, isNetty());
-
-         server.start();
-
-         ClientSessionFactory sf = createFactory(isNetty());
-         sf.setCompressLargeMessages(true);
-
-         session = sf.createSession(false, false, false);
-
-         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
-
-         SimpleString ADDRESS2 = LargeMessageTest.ADDRESS.concat("-2");
-
-         session.createQueue(ADDRESS2, ADDRESS2, true);
-
-         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
-
-         ClientProducer producer2 = session.createProducer(ADDRESS2);
-
-         Message clientFile = createLargeClientMessage(session, messageSize, false);
-
-         producer.send(clientFile);
-
-         session.commit();
-
-         session.start();
-
-         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
-         ClientConsumer consumer2 = session.createConsumer(ADDRESS2);
-
-         ClientMessage msg1 = consumer.receive(10000);
-         msg1.acknowledge();
-
-         producer2.send(msg1);
-
-         boolean failed = false;
-
-         try
-         {
-            producer2.send(msg1);
-         }
-         catch (Throwable e)
-         {
-            failed = true;
-         }
-
-         Assert.assertTrue("Exception expected", failed);
-
-         session.commit();
-
-         ClientMessage msg2 = consumer2.receive(10000);
-
-         Assert.assertNotNull(msg2);
-
-         msg2.acknowledge();
-
-         session.commit();
-
-         Assert.assertEquals(messageSize, msg2.getBodySize());
-
-         compareString(messageSize, msg2);
-
-         session.close();
-
-         validateNoFilesOnLargeDir();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-
-         try
-         {
-            session.close();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-   }
-
-   public void testResendCachedSmallStreamMessage() throws Exception
-   {
-      internalTestResendMessage(50000);
-   }
-
-   public void testResendCachedLargeStreamMessage() throws Exception
-   {
-      internalTestCachedResendMessage(150 * 1024);
-   }
-
-   public void internalTestCachedResendMessage(final long messageSize) throws Exception
-   {
-      ClientSession session = null;
-
-      try
-      {
-         server = createServer(true, isNetty());
-
-         server.start();
-
-         ClientSessionFactory sf = createFactory(isNetty());
-         sf.setCompressLargeMessages(true);
-         
-         sf.setMinLargeMessageSize(111);
-         
-         sf.setCacheLargeMessagesClient(true);
-
-         session = sf.createSession(false, false, false);
-
-         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
-
-         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
-
-         Message originalMsg = createLargeClientMessage(session, messageSize, false);
-
-         producer.send(originalMsg);
-
-         session.commit();
-
-         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
-
-         session.start();
-
-         ClientMessage msgReceived = consumer.receive(10000);
-         msgReceived.acknowledge();
-
-         session.commit();
-
-         compareString(messageSize, msgReceived);
-         
-         msgReceived.getBodyBuffer().readerIndex(0);
-         
-         producer.send(msgReceived);
-
-         session.commit();
-         
-         ClientMessage msgReceived2 = consumer.receive(10000);
-
-         msgReceived2.acknowledge();
-
-         compareString(messageSize, msgReceived2);
-         
-         session.commit();
-
-         session.close();
-
-         validateNoFilesOnLargeDir();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-
-         try
-         {
-            session.close();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-   }
-
-   /**
-    * @param messageSize
-    * @param msg2
-    */
-   private void compareString(final long messageSize, ClientMessage msg)
-   {
-      assertNotNull(msg);
-      for (long i = 0; i < messageSize; i++)
-      {
-         Assert.assertEquals("position "  + i, UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
-      }
-   }
-
    public void testFilePersistenceOneHugeMessage() throws Exception
    {
       testChunks(false,
@@ -2028,149 +1828,6 @@
 
    }
 
-   public void testSimpleRollback() throws Exception
-   {
-      simpleRollbackInternalTest(false);
-   }
-
-   public void testSimpleRollbackXA() throws Exception
-   {
-      simpleRollbackInternalTest(true);
-   }
-
-   public void simpleRollbackInternalTest(final boolean isXA) throws Exception
-   {
-      // there are two bindings.. one is ACKed, the other is not, the server is restarted
-      // The other binding is acked... The file must be deleted
-
-      try
-      {
-
-         server = createServer(true, isNetty());
-
-         server.start();
-
-         ClientSessionFactory sf = createFactory(isNetty());
-         sf.setCompressLargeMessages(true);
-
-         ClientSession session = sf.createSession(isXA, false, false);
-
-         Xid xid = null;
-
-         if (isXA)
-         {
-            xid = newXID();
-            session.start(xid, XAResource.TMNOFLAGS);
-         }
-
-         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, null, true);
-
-         int numberOfBytes = 200000;
-
-         session.start();
-
-         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
-
-         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
-
-         for (int n = 0; n < 10; n++)
-         {
-            Message clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
-
-            producer.send(clientFile);
-
-            Assert.assertNull(consumer.receiveImmediate());
-
-            if (isXA)
-            {
-               session.end(xid, XAResource.TMSUCCESS);
-               session.rollback(xid);
-               xid = newXID();
-               session.start(xid, XAResource.TMNOFLAGS);
-            }
-            else
-            {
-               session.rollback();
-            }
-
-            clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
-
-            producer.send(clientFile);
-
-            Assert.assertNull(consumer.receiveImmediate());
-
-            if (isXA)
-            {
-               session.end(xid, XAResource.TMSUCCESS);
-               session.commit(xid, true);
-               xid = newXID();
-               session.start(xid, XAResource.TMNOFLAGS);
-            }
-            else
-            {
-               session.commit();
-            }
-
-            for (int i = 0; i < 2; i++)
-            {
-
-               ClientMessage clientMessage = consumer.receive(5000);
-
-               Assert.assertNotNull(clientMessage);
-
-               Assert.assertEquals(numberOfBytes, clientMessage.getBodyBuffer().writerIndex());
-
-               clientMessage.acknowledge();
-
-               if (isXA)
-               {
-                  if (i == 0)
-                  {
-                     session.end(xid, XAResource.TMSUCCESS);
-                     session.prepare(xid);
-                     session.rollback(xid);
-                     xid = newXID();
-                     session.start(xid, XAResource.TMNOFLAGS);
-                  }
-                  else
-                  {
-                     session.end(xid, XAResource.TMSUCCESS);
-                     session.commit(xid, true);
-                     xid = newXID();
-                     session.start(xid, XAResource.TMNOFLAGS);
-                  }
-               }
-               else
-               {
-                  if (i == 0)
-                  {
-                     session.rollback();
-                  }
-                  else
-                  {
-                     session.commit();
-                  }
-               }
-            }
-         }
-
-         session.close();
-
-         validateNoFilesOnLargeDir();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
-   }
-
    public void testBufferMultipleLargeMessages() throws Exception
    {
       ClientSession session = null;
@@ -2540,68 +2197,7 @@
       }
    }
 
-   // The ClientConsumer should be able to also send ServerLargeMessages as that's done by the CoreBridge
-   public void testSendServerMessage() throws Exception
-   {
-      HornetQServer server = createServer(true);
 
-      server.start();
-
-      ClientSessionFactory sf = createFactory(false);
-      sf.setCompressLargeMessages(true);
-
-      ClientSession session = sf.createSession(false, false);
-
-      try
-      {
-         LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
-
-         fileMessage.setMessageID(1005);
-
-         for (int i = 0; i < LARGE_MESSAGE_SIZE; i++)
-         {
-            fileMessage.addBytes(new byte[] { UnitTestCase.getSamplebyte(i) });
-         }
-
-         fileMessage.releaseResources();
-
-         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
-
-         ClientProducer prod = session.createProducer(LargeMessageTest.ADDRESS);
-
-         prod.send(fileMessage);
-
-         fileMessage.deleteFile();
-
-         session.commit();
-
-         session.start();
-
-         ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
-
-         ClientMessage msg = cons.receive(5000);
-
-         Assert.assertNotNull(msg);
-
-         Assert.assertEquals(msg.getBodySize(), LARGE_MESSAGE_SIZE);
-
-         for (int i = 0; i < LARGE_MESSAGE_SIZE; i++)
-         {
-            Assert.assertEquals(UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
-         }
-
-         msg.acknowledge();
-
-         session.commit();
-
-      }
-      finally
-      {
-         sf.close();
-         server.stop();
-      }
-   }
-
    public void testLargeMessageCompression() throws Exception
    {
       final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);

Modified: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-12-01 12:23:36 UTC (rev 9959)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-12-01 14:25:42 UTC (rev 9960)
@@ -887,6 +887,7 @@
          }
          catch (Throwable e)
          {
+            log.error("failed", e);
             failed = true;
          }
 

Added: branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java
===================================================================
--- branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java	                        (rev 0)
+++ branches/Branch_Large_Message_Compression/tests/src/org/hornetq/tests/unit/util/CompressionUtilTest.java	2010-12-01 14:25:42 UTC (rev 9960)
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.zip.Deflater;
+
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.DeflaterReader;
+import org.hornetq.utils.InflaterReader;
+import org.hornetq.utils.InflaterWriter;
+
+/**
+ * A CompressionUtilTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ */
+public class CompressionUtilTest extends UnitTestCase
+{
+   
+   public void testDeflaterReader() throws Exception
+   {
+      String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+      byte[] input = inputString.getBytes("UTF-8");
+
+      ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
+      
+      DeflaterReader reader = new DeflaterReader(inputStream);
+
+      ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+      int b = reader.read();
+      
+      while (b != -1)
+      {
+         zipHolder.add(b);
+         b = reader.read();
+      }
+      
+      byte[] allCompressed = new byte[zipHolder.size()];
+      for (int i = 0; i < allCompressed.length; i++)
+      {
+         allCompressed[i] = (byte) zipHolder.get(i).intValue();
+      }
+      
+      byte[] output = new byte[30];
+      Deflater compresser = new Deflater();
+      compresser.setInput(input);
+      compresser.finish();
+      int compressedDataLength = compresser.deflate(output);
+      
+      compareByteArray(allCompressed, output, compressedDataLength);
+   }
+   
+   public void testDeflaterReader2() throws Exception
+   {
+      String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+      byte[] input = inputString.getBytes("UTF-8");
+
+      ByteArrayInputStream inputStream = new ByteArrayInputStream(input);
+      
+      DeflaterReader reader = new DeflaterReader(inputStream);
+
+      byte[] buffer = new byte[7];
+      ArrayList<Integer> zipHolder = new ArrayList<Integer>();
+      
+      int n = reader.read(buffer);
+      while (n != -1)
+      {
+         for (int i = 0; i < n; i++)
+         {
+            zipHolder.add((int)buffer[i]);
+         }
+         n = reader.read(buffer);
+      }
+      
+      byte[] allCompressed = new byte[zipHolder.size()];
+      for (int i = 0; i < allCompressed.length; i++)
+      {
+         allCompressed[i] = (byte) zipHolder.get(i).intValue();
+      }
+      
+      byte[] output = new byte[30];
+      Deflater compresser = new Deflater();
+      compresser.setInput(input);
+      compresser.finish();
+      int compressedDataLength = compresser.deflate(output);
+      
+      compareByteArray(allCompressed, output, compressedDataLength);
+   }
+   
+   public void testInflaterReader() throws Exception
+   {
+      String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+      byte[] input = inputString.getBytes("UTF-8");
+      byte[] output = new byte[30];
+      Deflater compresser = new Deflater();
+      compresser.setInput(input);
+      compresser.finish();
+      int compressedDataLength = compresser.deflate(output);
+
+      byte[] zipBytes = new byte[compressedDataLength];
+      
+      System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
+      ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
+      
+      InflaterReader inflater = new InflaterReader(byteInput);
+      ArrayList<Integer> holder = new ArrayList<Integer>();
+      int read = inflater.read();
+      
+      while (read != -1)
+      {
+         holder.add(read);
+         read = inflater.read();
+      }
+      
+      byte[] result = new byte[holder.size()];
+      
+      for (int i = 0; i < result.length; i++)
+      {
+         result[i] = holder.get(i).byteValue();
+      }
+      
+      String txt = new String(result);
+      
+      assertEquals(inputString, txt);
+
+   }
+   
+   public void testInflaterWriter() throws Exception
+   {
+      String inputString = "blahblahblah??blahblahblahblahblah??blablahblah??blablahblah??bla";
+      byte[] input = inputString.getBytes("UTF-8");
+      byte[] output = new byte[30];
+      Deflater compresser = new Deflater();
+      compresser.setInput(input);
+      compresser.finish();
+      int compressedDataLength = compresser.deflate(output);
+
+      byte[] zipBytes = new byte[compressedDataLength];
+      
+      System.arraycopy(output, 0, zipBytes, 0, compressedDataLength);
+      ByteArrayInputStream byteInput = new ByteArrayInputStream(zipBytes);
+      
+      ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
+      InflaterWriter writer = new InflaterWriter(byteOutput);
+      
+      byte[] zipBuffer = new byte[12];
+      
+      int n = byteInput.read(zipBuffer);
+      while (n > 0)
+      {
+         writer.write(zipBuffer, 0, n);
+         n = byteInput.read(zipBuffer);
+      }
+
+      writer.close();
+      
+      byte[] outcome = byteOutput.toByteArray();
+      String outStr = new String(outcome);
+      
+      assertEquals(inputString, outStr);
+   }
+   
+   private void compareByteArray(byte[] first, byte[] second, int length)
+   {
+      for (int i = 0; i < length; i++)
+      {
+         assertEquals(first[i], second[i]);
+      }
+   }
+}



More information about the hornetq-commits mailing list