[hornetq-commits] JBoss hornetq SVN: r8200 - in trunk: src/main/org/hornetq/core/remoting/impl/wireformat and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 3 17:03:58 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-03 17:03:58 -0500 (Tue, 03 Nov 2009)
New Revision: 8200

Added:
   trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-202 - Fixing flow control on LargeMessage when using Netty

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-11-03 18:27:35 UTC (rev 8199)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-11-03 22:03:58 UTC (rev 8200)
@@ -480,7 +480,8 @@
       }
 
       // Flow control for the first packet, we will have others
-      flowControl(packet.getPacketSize(), false);
+      // It's using the RequiredBufferSize as the getSize() could be different between transports
+      flowControl(packet.getRequiredBufferSize(), false);
 
       ClientMessageInternal currentChunkMessage = new ClientMessageImpl(packet.getDeliveryCount());
 
@@ -512,7 +513,6 @@
       {
          return;
       }
-
       currentLargeMessageBuffer.addPacket(chunk);
    }
 

Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2009-11-03 18:27:35 UTC (rev 8199)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java	2009-11-03 22:03:58 UTC (rev 8200)
@@ -162,7 +162,7 @@
 
                outStream.write(packet.getBody());
 
-               flowControlCredit = packet.getPacketSize();
+               flowControlCredit = packet.getRequiredBufferSize();
                continues = packet.isContinues();
 
                notifyAll();
@@ -248,7 +248,7 @@
             {
                break;
             }
-            totalFlowControl += packet.getPacketSize();
+            totalFlowControl += packet.getRequiredBufferSize();
             continues = packet.isContinues();
             sendPacketToOutput(output, packet);
          }
@@ -1239,7 +1239,7 @@
             throw new IndexOutOfBoundsException();
          }
 
-         consumerInternal.flowControl(currentPacket.getPacketSize(), !currentPacket.isContinues());
+         consumerInternal.flowControl(currentPacket.getRequiredBufferSize(), !currentPacket.isContinues());
 
          packetPosition += sizeToAdd;
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-11-03 18:27:35 UTC (rev 8199)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-11-03 22:03:58 UTC (rev 8200)
@@ -31,12 +31,6 @@
 {
    // Constants -----------------------------------------------------
 
-   public static final int SESSION_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE = BASIC_PACKET_SIZE + DataConstants.SIZE_LONG +
-                                                                       DataConstants.SIZE_LONG +
-                                                                       DataConstants.SIZE_INT +
-                                                                       DataConstants.SIZE_BOOLEAN +
-                                                                       DataConstants.SIZE_INT;
-
    private static final Logger log = Logger.getLogger(SessionReceiveMessage.class);
 
    // Attributes ----------------------------------------------------
@@ -142,14 +136,30 @@
    {
       if (largeMessage)
       {
-         return SESSION_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE + largeMessageHeader.length;
+         return BASIC_PACKET_SIZE + 
+                // consumerID
+                DataConstants.SIZE_LONG +
+                // deliveryCount
+                DataConstants.SIZE_INT +
+                // largeMessage (boolean)
+                DataConstants.SIZE_BOOLEAN +
+                // LargeMessageSize (Long)
+                DataConstants.SIZE_LONG +
+                // largeMessageHeader.length (int)
+                DataConstants.SIZE_INT +
+                // ByteArray size
+                largeMessageHeader.length;
       }
       else
       {
          return BASIC_PACKET_SIZE + 
+                // consumerID
                 DataConstants.SIZE_LONG +
+                // deliveryCount
                 DataConstants.SIZE_INT +
+                // isLargeMessage
                 DataConstants.SIZE_BOOLEAN +
+                // message.encoding
                 (serverMessage != null ? serverMessage.getEncodeSize() : clientMessage.getEncodeSize());
       }
    }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-03 18:27:35 UTC (rev 8199)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-03 22:03:58 UTC (rev 8200)
@@ -331,7 +331,7 @@
 
          if (trace)
          {
-            log.trace("Received " + credits +
+            trace("Received " + credits +
                       " credits, previous value = " +
                       previous +
                       " currentValue = " +
@@ -695,8 +695,8 @@
             if (availableCredits != null)
             {
                // Flow control needs to be done in advance.
-               
-               //Again WHY? Is this necessary now we don't replicate sessions?
+               // If we take out credits as we send, the client would be sending credits back as we are delivering
+               // as a result we would fire up a lot of packets over using the channel.
                precalculateAvailableCredits = preCalculateFlowControl(initialMessage);
             }
             else

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java	2009-11-03 18:27:35 UTC (rev 8199)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java	2009-11-03 22:03:58 UTC (rev 8200)
@@ -49,9 +49,14 @@
 
    private static final boolean isTrace = log.isTraceEnabled();
 
+   protected boolean isNetty()
+   {
+      return true;
+   }
+   
    private int getMessageEncodeSize(final SimpleString address) throws Exception
    {
-      ClientSessionFactory cf = createInVMFactory();
+      ClientSessionFactory cf = createFactory(isNetty());
       ClientSession session = cf.createSession(false, true, true);
       ClientMessage message = session.createClientMessage(false);
       // we need to set the destination so we can calculate the encodesize correctly
@@ -69,8 +74,8 @@
    * */
    public void testSendWindowSize() throws Exception
    {
-      HornetQServer messagingService = createServer(false);
-      ClientSessionFactory cf = createInVMFactory();
+      HornetQServer messagingService = createServer(false, isNetty());
+      ClientSessionFactory cf = createFactory(isNetty());
       try
       {
          messagingService.start();
@@ -124,7 +129,7 @@
 
    public void testSlowConsumerBufferingOne() throws Exception
    {
-      HornetQServer server = createServer(false);
+      HornetQServer server = createServer(false, isNetty());
 
       ClientSession sessionB = null;
       ClientSession session = null;
@@ -135,7 +140,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
          sf.setConsumerWindowSize(1);
 
          session = sf.createSession(false, true, true);
@@ -216,7 +221,7 @@
 
    private void internalTestSlowConsumerNoBuffer(final boolean largeMessages) throws Exception
    {
-      HornetQServer server = createServer(false);
+      HornetQServer server = createServer(false, isNetty());
 
       ClientSession sessionB = null;
       ClientSession session = null;
@@ -227,7 +232,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
          sf.setConsumerWindowSize(0);
 
          if (largeMessages)
@@ -346,7 +351,7 @@
 
    private void internalTestSlowConsumerNoBuffer2(final boolean largeMessages) throws Exception
    {
-      HornetQServer server = createServer(false);
+      HornetQServer server = createServer(false, isNetty());
 
       ClientSession session1 = null;
       ClientSession session2 = null;
@@ -357,7 +362,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          sf.setConsumerWindowSize(0);
 
@@ -529,7 +534,7 @@
    public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean largeMessages) throws Exception
    {
 
-      HornetQServer server = createServer(false);
+      HornetQServer server = createServer(false, isNetty());
 
       ClientSession sessionB = null;
       ClientSession session = null;
@@ -540,7 +545,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
          sf.setConsumerWindowSize(0);
 
          if (largeMessages)
@@ -694,7 +699,7 @@
 
    private void internalTestSlowConsumerOnMessageHandlerBufferOne(final boolean largeMessage) throws Exception
    {
-      HornetQServer server = createServer(false);
+      HornetQServer server = createServer(false, isNetty());
 
       ClientSession sessionB = null;
       ClientSession session = null;
@@ -705,7 +710,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
          sf.setConsumerWindowSize(1);
 
          if (largeMessage)
@@ -865,7 +870,7 @@
    private void testNoWindowRoundRobin(final boolean largeMessages) throws Exception
    {
 
-      HornetQServer server = createServer(false);
+      HornetQServer server = createServer(false, isNetty());
 
       ClientSession sessionA = null;
       ClientSession sessionB = null;
@@ -876,7 +881,7 @@
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
          sf.setConsumerWindowSize(-1);
          if (largeMessages)
          {

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-03 18:27:35 UTC (rev 8199)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-03 22:03:58 UTC (rev 8200)
@@ -38,7 +38,6 @@
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.utils.DataConstants;
@@ -73,45 +72,35 @@
 
    // Public --------------------------------------------------------
 
-//   public void testFlowControlWithSyncReceiveNettyZeroConsumerWindowSize() throws Exception
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+///  Those tests are duplicating ConsumerWindowSizeTest and NettyConsumerWindowSizeTest. Do we need those here?
+// 
+//   public void testFlowControlWithSyncReceiveZeroConsumerWindowSize() throws Exception
 //   {
-//      testFlowControlWithSyncReceive(true, 0);
+//      testFlowControlWithSyncReceive(0);
 //   }
-//   
-//   public void testFlowControlWithSyncReceiveInVMZeroConsumerWindowSize() throws Exception
+//
+//   public void testFlowControlWithSyncReceiveSmallConsumerWindowSize() throws Exception
 //   {
-//      testFlowControlWithSyncReceive(false, 0);
+//      testFlowControlWithSyncReceive(1000);
 //   }
-//   
-//   public void testFlowControlWithSyncReceiveNettySmallConsumerWindowSize() throws Exception
+//
+//   private void testFlowControlWithSyncReceive(final int consumerWindowSize) throws Exception
 //   {
-//      testFlowControlWithSyncReceive(true, 1000);
-//   }
-//   
-//   public void testFlowControlWithSyncReceiveInVMSmallConsumerWindowSize() throws Exception
-//   {
-//      testFlowControlWithSyncReceive(false, 1000);
-//   }
-//   
-//   private void testFlowControlWithSyncReceive(final boolean netty, final int consumerWindowSize) throws Exception
-//   {      
 //      ClientSession session = null;
 //
 //      try
 //      {
-//         if (netty)
-//         {
-//            server = createServer(true, createDefaultConfig(true));
-//         }
-//         else
-//         {
-//            server = createServer(true);
-//         }
+//         server = createServer(true, isNetty());
 //
 //         server.start();
 //
-//         ClientSessionFactory sf = createInVMFactory();
-//         
+//         ClientSessionFactory sf = createFactory(isNetty());
+//
 //         sf.setConsumerWindowSize(consumerWindowSize);
 //         sf.setMinLargeMessageSize(1000);
 //
@@ -130,31 +119,31 @@
 //            Message clientFile = createLargeClientMessage(session, messageSize, true);
 //
 //            producer.send(clientFile);
-//            
+//
 //            log.info("Sent message " + i);
 //         }
 //
 //         ClientConsumer consumer = session.createConsumer(ADDRESS);
-//         
+//
 //         session.start();
-//         
+//
 //         for (int i = 0; i < numMessages; i++)
 //         {
 //            ClientMessage msg = consumer.receive(1000);
-//            
+//
 //            int availBytes = msg.getBody().readableBytes();
-//            
+//
 //            assertEquals(messageSize, availBytes);
-//            
+//
 //            byte[] bytes = new byte[availBytes];
-//            
+//
 //            msg.getBody().readBytes(bytes);
 //
 //            msg.acknowledge();
-//            
+//
 //            log.info("Received message " + i);
 //         }
-//         
+//
 //         session.close();
 //
 //         validateNoFilesOnLargeDir();
@@ -178,151 +167,127 @@
 //         }
 //      }
 //   }
-//   
-//   public void testFlowControlWithListenerNettyZeroConsumerWindowSize() throws Exception
+//
+//   public void testFlowControlWithListenerZeroConsumerWindowSize() throws Exception
 //   {
-//      testFlowControlWithListener(true, 0);
+//      testFlowControlWithListener(0);
 //   }
-//   
-//   public void testFlowControlWithListenerInVMZeroConsumerWindowSize() throws Exception
+//
+//   public void testFlowControlWithListenerSmallConsumerWindowSize() throws Exception
 //   {
-//      testFlowControlWithListener(false, 0);
+//      testFlowControlWithListener(1000);
 //   }
-//   
-//   public void testFlowControlWithListenerNettySmallConsumerWindowSize() throws Exception
+//
+//   private void testFlowControlWithListener(final int consumerWindowSize) throws Exception
 //   {
-//      testFlowControlWithListener(true, 1000);
+//      ClientSession session = null;
+//
+//      try
+//      {
+//         server = createServer(true, isNetty());
+//
+//         server.start();
+//
+//         ClientSessionFactory sf;
+//
+//         sf = createFactory(isNetty());
+//
+//         sf.setConsumerWindowSize(consumerWindowSize);
+//         sf.setMinLargeMessageSize(1000);
+//
+//         final int messageSize = 10000;
+//
+//         session = sf.createSession(false, true, true);
+//
+//         session.createTemporaryQueue(ADDRESS, ADDRESS);
+//
+//         ClientProducer producer = session.createProducer(ADDRESS);
+//
+//         final int numMessages = 1000;
+//
+//         for (int i = 0; i < numMessages; i++)
+//         {
+//            Message clientFile = createLargeClientMessage(session, messageSize, false);
+//
+//            producer.send(clientFile);
+//
+//            log.info("Sent message " + i);
+//         }
+//
+//         ClientConsumer consumer = session.createConsumer(ADDRESS);
+//
+//         class MyHandler implements MessageHandler
+//         {
+//            int count = 0;
+//
+//            final CountDownLatch latch = new CountDownLatch(1);
+//
+//            volatile Exception exception;
+//
+//            public void onMessage(ClientMessage message)
+//            {
+//               try
+//               {
+//                  log.info("got message " + count);
+//
+//                  int availBytes = message.getBody().readableBytes();
+//
+//                  assertEquals(messageSize, availBytes);
+//
+//                  byte[] bytes = new byte[availBytes];
+//
+//                  message.getBody().readBytes(bytes);
+//
+//                  message.acknowledge();
+//
+//                  if (++count == numMessages)
+//                  {
+//                     latch.countDown();
+//                  }
+//               }
+//               catch (Exception e)
+//               {
+//                  log.error("Failed to handle message", e);
+//
+//                  this.exception = e;
+//               }
+//            }
+//         }
+//
+//         MyHandler handler = new MyHandler();
+//
+//         consumer.setMessageHandler(handler);
+//
+//         session.start();
+//
+//         handler.latch.await(10000, TimeUnit.MILLISECONDS);
+//
+//         assertNull(handler.exception);
+//
+//         session.close();
+//
+//         validateNoFilesOnLargeDir();
+//      }
+//      finally
+//      {
+//         try
+//         {
+//            server.stop();
+//         }
+//         catch (Throwable ignored)
+//         {
+//         }
+//
+//         try
+//         {
+//            session.close();
+//         }
+//         catch (Throwable ignored)
+//         {
+//         }
+//      }
 //   }
-//   
-//   public void testFlowControlWithListenerInVMSmallConsumerWindowSize() throws Exception
-//   {
-//      testFlowControlWithListener(false, 1000);
-//   }
-   
-   private void testFlowControlWithListener(final boolean netty, final int consumerWindowSize) throws Exception
-   {      
-      ClientSession session = null;
 
-      try
-      {
-         if (netty)
-         {
-            server = createServer(true, createDefaultConfig(true));
-         }
-         else
-         {
-            server = createServer(true);
-         }
-
-         server.start();
-
-         ClientSessionFactory sf;
-         
-         if (netty)
-         {
-            sf = createNettyFactory();
-         }
-         else
-         {
-            sf = createInVMFactory();
-         }
-         
-         sf.setConsumerWindowSize(consumerWindowSize);
-         sf.setMinLargeMessageSize(1000);
-
-         final int messageSize = 10000;
-
-         session = sf.createSession(false, true, true);
-
-         session.createTemporaryQueue(ADDRESS, ADDRESS);
-
-         ClientProducer producer = session.createProducer(ADDRESS);
-
-         final int numMessages = 1000;
-
-         for (int i = 0; i < numMessages; i++)
-         {
-            Message clientFile = createLargeClientMessage(session, messageSize, false);
-
-            producer.send(clientFile);
-            
-            log.info("Sent message " + i);
-         }
-
-         ClientConsumer consumer = session.createConsumer(ADDRESS);
-         
-         class MyHandler implements MessageHandler
-         {
-            int count = 0;
-
-            final CountDownLatch latch = new CountDownLatch(1);
-
-            volatile Exception exception;
-
-            public void onMessage(ClientMessage message)
-            {
-               try
-               {
-                  log.info("got message " + count);
-                  
-                  int availBytes = message.getBody().readableBytes();
-                  
-                  assertEquals(messageSize, availBytes);
-                  
-                  byte[] bytes = new byte[availBytes];
-                  
-                  message.getBody().readBytes(bytes);
-                  
-                  message.acknowledge();
-
-                  if (++count == numMessages)
-                  {
-                     latch.countDown();
-                  }
-               }
-               catch (Exception e)
-               {
-                  log.error("Failed to handle message", e);
-
-                  this.exception = e;
-               }
-            }
-         }
-         
-         MyHandler handler = new MyHandler();
-         
-         consumer.setMessageHandler(handler);
-         
-         session.start();
-         
-         handler.latch.await(10000, TimeUnit.MILLISECONDS);
-         
-         assertNull(handler.exception);
-         
-         session.close();
-
-         validateNoFilesOnLargeDir();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-
-         try
-         {
-            session.close();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-   }
-
    public void testCloseConsumer() throws Exception
    {
       final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -331,11 +296,11 @@
 
       try
       {
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          session = sf.createSession(false, false, false);
 
@@ -400,11 +365,11 @@
 
       try
       {
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          session = sf.createSession(false, false, false);
 
@@ -453,11 +418,11 @@
          session.close();
          server.stop();
 
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         sf = createInVMFactory();
+         sf = createFactory(isNetty());
 
          session = sf.createSession(false, false, false);
 
@@ -527,11 +492,12 @@
 
       try
       {
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
+         
 
          session = sf.createSession(false, false, false);
 
@@ -607,11 +573,11 @@
 
       try
       {
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
          SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
@@ -742,11 +708,11 @@
 
       try
       {
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
          SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
@@ -818,11 +784,11 @@
          session.close();
          server.stop();
 
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         sf = createInVMFactory();
+         sf = createFactory(isNetty());
 
          session = sf.createSession(false, false, false);
 
@@ -877,7 +843,7 @@
 
       try
       {
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
@@ -889,7 +855,7 @@
 
          server.getAddressSettingsRepository().addMatch("*", addressSettings);
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          session = sf.createSession(false, false, false);
 
@@ -927,11 +893,11 @@
          session.close();
          server.stop();
 
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         sf = createInVMFactory();
+         sf = createFactory(isNetty());
 
          session = sf.createSession(false, false, false);
 
@@ -992,11 +958,11 @@
 
       try
       {
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          session = sf.createSession(false, false, false);
 
@@ -1502,11 +1468,6 @@
       testPageOnLargeMessage(true, false);
    }
 
-   public void testPageOnLargeMessageNullPersistence() throws Exception
-   {
-      testPageOnLargeMessage(false, false);
-   }
-
    public void testSendSmallMessageXA() throws Exception
    {
       testChunks(true, false, true, false, true, false, false, true, false, 100, 4, RECEIVE_WAIT_TIME, 0);
@@ -1595,13 +1556,13 @@
       try
       {
 
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
          SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
@@ -1678,13 +1639,13 @@
       try
       {
 
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
          SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
@@ -1708,11 +1669,11 @@
 
             server.stop();
 
-            server = createServer(true);
+            server = createServer(true, isNetty());
 
             server.start();
 
-            sf = createInVMFactory();
+            sf = createFactory(isNetty());
 
             session = sf.createSession(null, null, false, true, true, false, 0);
          }
@@ -1762,11 +1723,11 @@
 
       try
       {
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          session = sf.createSession(isXA, false, false);
 
@@ -1849,11 +1810,11 @@
       try
       {
 
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          ClientSession session = sf.createSession(isXA, false, false);
 
@@ -1983,11 +1944,11 @@
       try
       {
 
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          sf.setMinLargeMessageSize(1024);
          sf.setConsumerWindowSize(1024 * 1024);
@@ -2085,11 +2046,11 @@
       try
       {
 
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          sf.setMinLargeMessageSize(1024);
          sf.setConsumerWindowSize(1024 * 1024);
@@ -2187,11 +2148,11 @@
       try
       {
 
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          sf.setMinLargeMessageSize(100 * 1024);
 
@@ -2260,11 +2221,11 @@
       try
       {
 
-         server = createServer(true);
+         server = createServer(true, isNetty());
 
          server.start();
 
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          sf.setMinLargeMessageSize(1024);
 
@@ -2357,7 +2318,7 @@
 
    protected void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception
    {
-      Configuration config = createDefaultConfig();
+      Configuration config = createDefaultConfig(isNetty());
 
       final int PAGE_MAX = 20 * 1024;
 
@@ -2366,7 +2327,6 @@
       HashMap<String, AddressSettings> map = new HashMap<String, AddressSettings>();
 
       AddressSettings value = new AddressSettings();
-      value.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
       map.put(ADDRESS.toString(), value);
       server = createServer(realFiles, config, PAGE_SIZE, PAGE_MAX, map);
       server.start();
@@ -2377,7 +2337,7 @@
 
       try
       {
-         ClientSessionFactory sf = createInVMFactory();
+         ClientSessionFactory sf = createFactory(isNetty());
 
          if (sendBlocking)
          {
@@ -2431,7 +2391,7 @@
             server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
             server.start();
 
-            sf = createInVMFactory();
+            sf = createFactory(isNetty());
          }
 
          session = sf.createSession(null, null, false, true, true, false, 0);

Added: trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java	2009-11-03 22:03:58 UTC (rev 8200)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+/**
+ * A NettyConsumerWindowSizeTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NettyConsumerWindowSizeTest extends ConsumerWindowSizeTest
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   
+   protected boolean isNetty()
+   {
+      return true;
+   }
+   
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2009-11-03 18:27:35 UTC (rev 8199)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2009-11-03 22:03:58 UTC (rev 8200)
@@ -143,9 +143,15 @@
 
    protected HornetQServer createServer(final boolean realFiles)
    {
-      return createServer(realFiles, createDefaultConfig(), -1, -1, new HashMap<String, AddressSettings>());
+      return createServer(realFiles, false);
    }
 
+
+   protected HornetQServer createServer(final boolean realFiles, final boolean netty)
+   {
+      return createServer(realFiles, createDefaultConfig(netty), -1, -1, new HashMap<String, AddressSettings>());
+   }
+
    protected HornetQServer createServer(final boolean realFiles, final Configuration configuration)
    {
       return createServer(realFiles, configuration, -1, -1, new HashMap<String, AddressSettings>());
@@ -289,6 +295,18 @@
       return configuration;
    }
 
+   protected ClientSessionFactory createFactory(boolean isNetty)
+   {
+      if (isNetty)
+      {
+         return createNettyFactory();
+      }
+      else
+      {
+         return createInVMFactory();
+      }
+   }
+   
    protected ClientSessionFactory createInVMFactory()
    {
       return createFactory(INVM_CONNECTOR_FACTORY);



More information about the hornetq-commits mailing list