[hornetq-commits] JBoss hornetq SVN: r9171 - in trunk: src/main/org/hornetq/core/message/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Apr 26 20:41:56 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-04-26 20:41:55 -0400 (Mon, 26 Apr 2010)
New Revision: 9171

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/largemessage/JMSLargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
Log:
HORNETQ-296 - LargeMessage and producer

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2010-04-27 00:41:55 UTC (rev 9171)
@@ -25,6 +25,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.utils.DataConstants;
 
 /**
  * 
@@ -249,6 +250,7 @@
 
       public void open()
       {
+         getBodyBuffer().readerIndex(0);
       }
 
       public void close()
@@ -257,7 +259,14 @@
 
       public long getLargeBodySize()
       {
-         return buffer.writerIndex();
+         if (isLargeMessage())
+         {
+            return getBodyBuffer().writerIndex();
+         }
+         else
+         {
+            return getBodyBuffer().writerIndex() - BODY_OFFSET;
+         }
       }
 
       public int encode(final ByteBuffer bufferRead) throws HornetQException

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2010-04-27 00:41:55 UTC (rev 9171)
@@ -217,7 +217,7 @@
       
       boolean isLarge;
 
-      if (msgI.getBodyInputStream() != null || msgI.isLargeMessage())
+      if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
       {
          isLarge = true;
       }

Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2010-04-27 00:41:55 UTC (rev 9171)
@@ -27,6 +27,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
@@ -60,6 +61,8 @@
    private volatile SessionReceiveContinuationMessage currentPacket = null;
 
    private final long totalSize;
+   
+   private final int bufferSize;
 
    private boolean streamEnded = false;
 
@@ -97,6 +100,15 @@
                                  final int readTimeout,
                                  final File cachedFile)
    {
+      this(consumerInternal, totalSize, readTimeout, cachedFile, 10 * 1024);
+   }
+
+   public LargeMessageBufferImpl(final ClientConsumerInternal consumerInternal,
+                                 final long totalSize,
+                                 final int readTimeout,
+                                 final File cachedFile,
+                                 final int bufferSize)
+   {
       this.consumerInternal = consumerInternal;
       this.readTimeout = readTimeout;
       this.totalSize = totalSize;
@@ -108,6 +120,7 @@
       {
          fileCache = new FileCache(cachedFile);
       }
+      this.bufferSize = bufferSize;
    }
 
    // Public --------------------------------------------------------
@@ -193,6 +206,7 @@
                }
             }
 
+            
             packets.offer(packet);
          }
       }
@@ -1020,7 +1034,30 @@
    {
       return (char)readShort();
    }
+   
+   public char getChar(final int index)
+   {
+      return (char)getShort(index);
+   }
 
+   public double getDouble(final int index)
+   {
+      return Double.longBitsToDouble(getLong(index));
+   }
+
+   public float getFloat(final int index)
+   {
+      return Float.intBitsToFloat(getInt(index));
+   }
+
+   public HornetQBuffer readBytes(final int length)
+   {
+      byte bytesToGet[] = new byte[length];
+      getBytes(readerIndex, bytesToGet);
+      readerIndex += length;
+      return HornetQBuffers.wrappedBuffer(bytesToGet);
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.spi.core.remoting.HornetQBuffer#readDouble()
     */
@@ -1313,8 +1350,6 @@
    private class FileCache
    {
 
-      private final int BUFFER_SIZE = 10 * 1024;
-
       public FileCache(final File cachedFile)
       {
          this.cachedFile = cachedFile;
@@ -1347,11 +1382,13 @@
                   throw new ArrayIndexOutOfBoundsException("position > " + cachedChannel.size());
                }
 
-               readCachePositionStart = position / BUFFER_SIZE * BUFFER_SIZE;
+               readCachePositionStart = position / bufferSize * bufferSize;
+               
+               cachedChannel.position(readCachePositionStart);
 
                if (readCache == null)
                {
-                  readCache = ByteBuffer.allocate(BUFFER_SIZE);
+                  readCache = ByteBuffer.allocate(bufferSize);
                }
 
                readCache.clear();
@@ -1452,80 +1489,47 @@
 
    public ChannelBuffer channelBuffer()
    {
-      // TODO Auto-generated method stub
       return null;
    }
 
    public HornetQBuffer copy(final int index, final int length)
    {
-      // TODO Auto-generated method stub
-      return null;
+      throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
    public HornetQBuffer duplicate()
    {
-      // TODO Auto-generated method stub
-      return null;
+      throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
-   public char getChar(final int index)
-   {
-      // TODO Auto-generated method stub
-      return 0;
-   }
-
-   public double getDouble(final int index)
-   {
-      // TODO Auto-generated method stub
-      return 0;
-   }
-
-   public float getFloat(final int index)
-   {
-      // TODO Auto-generated method stub
-      return 0;
-   }
-
-   public HornetQBuffer readBytes(final int length)
-   {
-      // TODO Auto-generated method stub
-      return null;
-   }
-
    public HornetQBuffer readSlice(final int length)
    {
-      // TODO Auto-generated method stub
-      return null;
+      throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
    public void setChar(final int index, final char value)
    {
-      // TODO Auto-generated method stub
-
+      throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
    public void setDouble(final int index, final double value)
    {
-      // TODO Auto-generated method stub
-
+      throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
    public void setFloat(final int index, final float value)
    {
-      // TODO Auto-generated method stub
-
+      throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
    public HornetQBuffer slice()
    {
-      // TODO Auto-generated method stub
-      return null;
+      throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
    public void writeBytes(final HornetQBuffer src, final int srcIndex, final int length)
    {
-      // TODO Auto-generated method stub
-
+      throw new IllegalAccessError(LargeMessageBufferImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
 }

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-04-27 00:41:55 UTC (rev 9171)
@@ -55,7 +55,10 @@
    public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_HQ_ROUTE_TO");
 
    public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
+   
+   public static final int BODY_OFFSET = BUFFER_HEADER_SPACE + DataConstants.SIZE_INT;
 
+
    protected long messageID;
 
    protected SimpleString address;
@@ -248,7 +251,7 @@
       {
          if (buffer instanceof LargeMessageBufferInternal == false)
          {
-            bodyBuffer = new ResetLimitWrappedHornetQBuffer(BUFFER_HEADER_SPACE + DataConstants.SIZE_INT, buffer, this);
+            bodyBuffer = new ResetLimitWrappedHornetQBuffer(BODY_OFFSET, buffer, this);
          }
          else
          {

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2010-04-27 00:41:55 UTC (rev 9171)
@@ -137,6 +137,109 @@
       }
    }
 
+
+   public void testLargeBufferTransacted() throws Exception
+   {
+      doTestLargeBuffer(true);
+   }
+   
+   public void testLargeBufferNotTransacted() throws Exception
+   {
+      doTestLargeBuffer(false);
+   }
+   
+   public void doTestLargeBuffer(boolean transacted) throws Exception
+   {
+      final int journalsize = 100 * 1024;
+      final int messageSize = 3 * journalsize;
+      // final int messageSize = 5 * 1024;
+
+      ClientSession session = null;
+
+      try
+      {
+         Configuration config = createDefaultConfig(isNetty());
+         config.setJournalFileSize(journalsize);
+         
+         config.setJournalBufferSize_AIO(10 * 1024);
+         config.setJournalBufferSize_NIO(10 * 1024);
+         
+         server = createServer(true, config);
+
+         server.start();
+
+         ClientSessionFactory sf = createFactory(isNetty());
+
+         session = sf.createSession(!transacted, !transacted, 0);
+
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         Message clientFile = session.createMessage(true);
+         for (int i = 0 ; i < messageSize; i++)
+         {
+            clientFile.getBodyBuffer().writeByte(getSamplebyte(i));
+         }
+         
+
+         producer.send(clientFile);
+
+         if (transacted)
+         {
+            session.commit();
+         }
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+         ClientMessage msg1 = consumer.receive(5000);
+         assertNotNull(msg1);
+         
+         Assert.assertNotNull(msg1);
+         
+         for (int i = 0 ; i < messageSize; i++)
+         {
+            //System.out.print(msg1.getBodyBuffer().readByte() + "  ");
+            //if (i % 100 == 0) System.out.println();
+            assertEquals("position = "  + i, getSamplebyte(i), msg1.getBodyBuffer().readByte());
+         }
+       
+         msg1.acknowledge();
+        
+         consumer.close();
+         
+         
+         if (transacted)
+         {
+            session.commit();
+         }
+         
+
+         session.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    public void testDLALargeMessage() throws Exception
    {
       final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -799,11 +902,95 @@
 
          Assert.assertEquals(messageSize, msg2.getBodySize());
 
-         for (int i = 0; i < messageSize; i++)
+         compareString(messageSize, msg2);
+
+         session.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
          {
-            Assert.assertEquals(UnitTestCase.getSamplebyte(i), msg2.getBodyBuffer().readByte());
+            server.stop();
          }
+         catch (Throwable ignored)
+         {
+         }
 
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   public void testResendCachedSmallStreamMessage() throws Exception
+   {
+      internalTestResendMessage(50000);
+   }
+
+   public void testResendCachedLargeStreamMessage() throws Exception
+   {
+      internalTestCachedResendMessage(150 * 1024);
+   }
+
+   public void internalTestCachedResendMessage(final long messageSize) throws Exception
+   {
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true, isNetty());
+
+         server.start();
+
+         ClientSessionFactory sf = createFactory(isNetty());
+         
+         sf.setMinLargeMessageSize(111);
+         
+         sf.setCacheLargeMessagesClient(true);
+
+         session = sf.createSession(false, false, false);
+
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         Message originalMsg = createLargeClientMessage(session, messageSize, false);
+
+         producer.send(originalMsg);
+
+         session.commit();
+
+         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+
+         session.start();
+
+         ClientMessage msgReceived = consumer.receive(10000);
+         msgReceived.acknowledge();
+
+         session.commit();
+
+         compareString(messageSize, msgReceived);
+         
+         msgReceived.getBodyBuffer().readerIndex(0);
+         
+         producer.send(msgReceived);
+
+         session.commit();
+         
+         ClientMessage msgReceived2 = consumer.receive(10000);
+
+         msgReceived2.acknowledge();
+
+         compareString(messageSize, msgReceived2);
+         
+         session.commit();
+
          session.close();
 
          validateNoFilesOnLargeDir();
@@ -828,6 +1015,19 @@
       }
    }
 
+   /**
+    * @param messageSize
+    * @param msg2
+    */
+   private void compareString(final long messageSize, ClientMessage msg)
+   {
+      assertNotNull(msg);
+      for (long i = 0; i < messageSize; i++)
+      {
+         Assert.assertEquals("position "  + i, UnitTestCase.getSamplebyte(i), msg.getBodyBuffer().readByte());
+      }
+   }
+
    public void testFilePersistenceOneHugeMessage() throws Exception
    {
       testChunks(false,

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java	2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java	2010-04-27 00:41:55 UTC (rev 9171)
@@ -239,7 +239,7 @@
                                         HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
                                         HornetQClient.DEFAULT_CONNECTION_TTL,
                                         callTimeout,
-                                        HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+                                        true,
                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
                                         HornetQClient.DEFAULT_CONSUMER_MAX_RATE,

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/largemessage/JMSLargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/largemessage/JMSLargeMessageTest.java	2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/largemessage/JMSLargeMessageTest.java	2010-04-27 00:41:55 UTC (rev 9171)
@@ -33,6 +33,8 @@
 
 import org.hornetq.tests.util.JMSTestBase;
 import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.UUID;
+import org.hornetq.utils.UUIDGenerator;
 
 /**
  *
@@ -338,6 +340,68 @@
 
    }
 
+
+   public void testHugeString() throws Exception
+   {
+      int msgSize = 1024 * 1024;
+
+      Connection conn = null;
+
+      try
+      {
+         conn = cf.createConnection();
+
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = session.createProducer(queue1);
+
+         TextMessage m = session.createTextMessage();
+         
+         StringBuffer buffer = new StringBuffer();
+         while(buffer.length() < msgSize)
+         {
+            buffer.append(UUIDGenerator.getInstance().generateStringUUID());
+         }
+         
+         final String originalString = buffer.toString();
+         
+         m.setText(originalString);
+         
+         buffer = null;
+
+         prod.send(m);
+
+         conn.close();
+         
+         validateNoFilesOnLargeDir(1);
+
+         conn = cf.createConnection();
+
+         session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons = session.createConsumer(queue1);
+
+         conn.start();
+
+         TextMessage rm = (TextMessage)cons.receive(10000);
+         Assert.assertNotNull(rm);
+
+         String str = rm.getText();
+         assertEquals(originalString, str);
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+      
+      validateNoFilesOnLargeDir(0);
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java	2010-04-26 16:25:35 UTC (rev 9170)
+++ trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java	2010-04-27 00:41:55 UTC (rev 9171)
@@ -56,12 +56,29 @@
 
    // Attributes ----------------------------------------------------
 
+   static int tmpFileCounter = 0; 
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
 
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      tmpFileCounter++;
+
+      File tmp = new File(getTestDir());
+      tmp.mkdirs();
+   }
+   
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
    // Test Simple getBytes
    public void testGetBytes() throws Exception
    {
@@ -193,7 +210,7 @@
 
    private File getTestFile()
    {
-      return new File(getTestDir(), "temp.file");
+      return new File(getTestDir(), "temp." + tmpFileCounter + ".file");
    }
 
    public void testReadDataOverCached() throws Exception
@@ -298,7 +315,49 @@
          Assert.assertEquals(i, bytes[i]);
       }
    }
+   
+   public void testSplitBufferOnFile() throws Exception
+   {
+      LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(),
+                                                                          1024 * 1024,
+                                                                          1,
+                                                                          getTestFile(),
+                                                                          1024);
+      try
+      {
 
+         long count = 0;
+         for (int i = 0; i < 10; i++)
+         {
+            byte buffer[] = new byte[10240];
+            for (int j = 0; j < 10240; j++)
+            {
+               buffer[j] = getSamplebyte(count++);
+            }
+            outBuffer.addPacket(new FakePacket(1, buffer, true, false));
+         }
+
+         outBuffer.readerIndex(0);
+
+         for (int i = 0; i < 10240 * 10; i++)
+         {
+            assertEquals("position " + i, getSamplebyte(i), outBuffer.readByte());
+         }
+
+         outBuffer.readerIndex(0);
+
+         for (int i = 0; i < 10240 * 10; i++)
+         {
+            assertEquals("position " + i, getSamplebyte(i), outBuffer.readByte());
+         }
+      }
+      finally
+      {
+         outBuffer.close();
+      }
+
+   }
+
    public void testStreamData() throws Exception
    {
       final LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(),
@@ -727,7 +786,7 @@
 
       public void stop(boolean waitForOnMessage) throws HornetQException
       {
-         //To change body of implemented methods use File | Settings | File Templates.
+         // To change body of implemented methods use File | Settings | File Templates.
       }
 
       public SessionQueueQueryResponseMessage getQueueInfo()



More information about the hornetq-commits mailing list