[hornetq-commits] JBoss hornetq SVN: r10063 - in trunk: src/main/org/hornetq/jms/client and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Dec 20 20:50:48 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-20 20:50:48 -0500 (Mon, 20 Dec 2010)
New Revision: 10063

Modified:
   trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
   trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
   trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
Log:
fixing tests on Large Message

Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java	2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java	2010-12-21 01:50:48 UTC (rev 10063)
@@ -1730,6 +1730,13 @@
                try
                {
                   msg = sourceConsumer.receive(1000);
+                  
+                  if (msg instanceof HornetQMessage)
+                  {
+                     // We need to check the buffer mainly in the case of LargeMessages
+                     // As we need to reconstruct the buffer before resending the message
+                     ((HornetQMessage)msg).checkBuffer();
+                  }
                }
                catch (JMSException jmse)
                {

Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java	2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java	2010-12-21 01:50:48 UTC (rev 10063)
@@ -879,6 +879,11 @@
    {
       message.getBodyBuffer().resetReaderIndex();
    }
+   
+   public void checkBuffer()
+   {
+      message.getBodyBuffer();
+   }
 
    public void doBeforeReceive() throws Exception
    {

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java	2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageCompressTest.java	2010-12-21 01:50:48 UTC (rev 10063)
@@ -27,6 +27,7 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.tests.util.RandomUtil;
 
 /**
  * A LargeMessageCompressTest
@@ -277,6 +278,111 @@
    }
 
 
+
+   public void testLargeMessageCompressionRestartAndCheckSize() throws Exception
+   {
+      final int messageSize = 1024 * 1024;
+
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true, isNetty());
+
+         server.start();
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, false, false);
+
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+         
+         byte [] msgs = new byte[1024 * 1024];
+         for (int i = 0 ; i < msgs.length; i++)
+         {
+            msgs[i] = RandomUtil.randomByte();
+         }
+
+         Message clientFile = createLargeClientMessage(session, msgs, true);
+
+         producer.send(clientFile);
+
+         session.commit();
+         
+         session.close();
+         
+         sf.close();
+         
+         locator.close();
+         
+         server.stop();
+         
+         server = createServer(true, isNetty());
+         
+         server.start();
+         
+         locator = createFactory(isNetty());
+         
+         sf = locator.createSessionFactory();
+         
+         session = sf.createSession();
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+         ClientMessage msg1 = consumer.receive(1000);
+         Assert.assertNotNull(msg1);
+         
+         assertEquals(messageSize, msg1.getBodySize());
+         
+         String testDir = this.getTestDir();
+         File testFile = new File(testDir, "async_large_message");
+         FileOutputStream output = new FileOutputStream(testFile);
+
+         msg1.saveToOutputStream(output);
+         
+         msg1.acknowledge();
+
+         session.commit();
+
+         consumer.close();
+
+         session.close();
+
+         //verify
+         FileInputStream input = new FileInputStream(testFile);
+         for (int i = 0 ; i < messageSize; i++)
+         {
+            byte b = (byte)input.read();
+            assertEquals("position = "  + i, msgs[i], b);
+         }
+         
+         testFile.delete();
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+
    public void testSendServerMessage() throws Exception
    {
       // doesn't make sense as compressed

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java	2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java	2010-12-21 01:50:48 UTC (rev 10063)
@@ -100,7 +100,7 @@
 
          public long countMessages(final String filter) throws Exception
          {
-            return (Long)proxy.invokeOperation("countMessages", filter);
+            return ((Number)proxy.invokeOperation("countMessages", filter)).intValue();
          }
 
          public boolean expireMessage(final String messageID) throws Exception

Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java	2010-12-20 15:10:20 UTC (rev 10062)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java	2010-12-21 01:50:48 UTC (rev 10063)
@@ -617,6 +617,13 @@
       return createLargeClientMessage(session, numberOfBytes, true);
    }
 
+   protected ClientMessage createLargeClientMessage (final ClientSession session, final byte[] buffer, final boolean durable) throws Exception
+   {
+      ClientMessage msgs = session.createMessage(durable);
+      msgs.getBodyBuffer().writeBytes(buffer);
+      return msgs;
+   }
+
    protected ClientMessage createLargeClientMessage(final ClientSession session,
                                                     final long numberOfBytes,
                                                     final boolean persistent) throws Exception



More information about the hornetq-commits mailing list