[jboss-cvs] JBoss Messaging SVN: r6639 - in trunk/tests/src/org/jboss/messaging/tests: integration/chunkmessage and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Apr 30 13:09:51 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-04-30 13:09:51 -0400 (Thu, 30 Apr 2009)
New Revision: 6639

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/
   trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageCleanupTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/
Removed:
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/
Modified:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnector.java
   trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnectorFactory.java
   trunk/tests/src/org/jboss/messaging/tests/stress/chunk/LargeMessageStressTest.java
   trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
Removing verbose outputs and renaming package on testsuite

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java	2009-04-30 16:45:28 UTC (rev 6638)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java	2009-04-30 17:09:51 UTC (rev 6639)
@@ -1,207 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.chunkmessage;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.client.impl.ClientSessionImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnector;
-import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnectorFactory;
-
-/**
- * A LargeMessageCleanupTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class LargeMessageCleanupTest extends LargeMessageTestBase
-{
-   // Constants -----------------------------------------------------
-   
-   private static final Logger log = Logger.getLogger(LargeMessageCleanupTest.class);
-
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-   
-   
-   public void testCleanup() throws Exception
-   {
-      clearData();
-      
-      FileOutputStream fileOut = new FileOutputStream(new File(getLargeMessagesDir(), "1234.tmp"));
-      
-      fileOut.write(new byte[1024]); // anything
-      
-      fileOut.close();
-
-      Configuration config = createDefaultConfig();
-
-      server = createServer(true, config, new HashMap<String, AddressSettings>());
-
-      server.start();
-
-      try
-      {
-
-         File directoryLarge = new File(getLargeMessagesDir());
-
-         assertEquals("The startup should have been deleted 1234.tmp", 0, directoryLarge.list().length);
-      }
-      finally
-      {
-         server.stop();
-      }
-   }
-
-   public void testFailureOnSendingFile() throws Exception
-   {
-      clearData();
-
-      Configuration config = createDefaultConfig();
-
-      config.setPagingMaxGlobalSizeBytes(20 * 1024);
-      config.setPagingGlobalWatermarkSize(10 * 1024);
-
-      server = createServer(true, config, new HashMap<String, AddressSettings>());
-
-      server.start();
-
-      final int numberOfBytes = 2 * 1024 * 1024;
-
-      ClientSession session = null;
-
-      class LocalCallback implements MockConnector.MockCallback
-      {
-         AtomicInteger counter = new AtomicInteger(0);
-
-         ClientSession session;
-
-         public void onWrite(final MessagingBuffer buffer)
-         {
-            log.info("calling cb onwrite** ");
-            if (counter.incrementAndGet() == 5)
-            {
-               RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
-               RemotingServiceImpl remotingServiceImpl = (RemotingServiceImpl)server.getRemotingService();
-               remotingServiceImpl.connectionException(conn.getID(),
-                                                       new MessagingException(MessagingException.NOT_CONNECTED, "blah!"));
-               conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
-               throw new IllegalStateException("blah");
-            }
-         }
-      }
-
-      LocalCallback callback = new LocalCallback();
-
-      try
-      {
-         HashMap<String, Object> parameters = new HashMap<String, Object>();
-         parameters.put("callback", callback);
-
-         TransportConfiguration transport = new TransportConfiguration(MockConnectorFactory.class.getCanonicalName(),
-                                                                       parameters);
-
-         ClientSessionFactory mockFactory = new ClientSessionFactoryImpl(transport);
-
-         mockFactory.setBlockOnNonPersistentSend(false);
-         mockFactory.setBlockOnPersistentSend(false);
-         mockFactory.setBlockOnAcknowledge(false);
-
-         session = mockFactory.createSession(null, null, false, true, true, false, 0);
-
-         callback.session = session;
-
-         session.createQueue(ADDRESS, ADDRESS, null, true);
-
-         ClientProducer producer = session.createProducer(ADDRESS);
-
-         ClientMessage clientLarge = createLargeClientMessage(session, numberOfBytes);
-
-         try
-         {
-            producer.send(clientLarge);
-            
-            fail("Exception was expected!");
-         }
-         catch (Exception e)
-         {
-         }
-
-         validateNoFilesOnLargeDir();
-
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Exception ignored)
-         {
-            ignored.printStackTrace();
-         }
-      }
-
-   }
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-   }
-
-   protected void tearDown() throws Exception
-   {
-      super.tearDown();
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java	2009-04-30 16:45:28 UTC (rev 6638)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java	2009-04-30 17:09:51 UTC (rev 6639)
@@ -1,670 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.chunkmessage;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.buffers.ChannelBuffers;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.utils.DataConstants;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * A LargeMessageTestBase
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * 
- * Created Oct 29, 2008 11:43:52 AM
- *
- *
- */
-public class LargeMessageTestBase extends ServiceTestBase
-{
-
-   // Constants -----------------------------------------------------
-   private static final Logger log = Logger.getLogger(LargeMessageTestBase.class);
-
-   protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
-
-   protected MessagingServer server;
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   protected void testChunks(final boolean isXA,
-                             final boolean rollbackFirstSend,
-                             final boolean useStreamOnConsume,
-                             final boolean realFiles,
-                             final boolean preAck,
-                             final boolean sendingBlocking,
-                             final boolean testBrowser,
-                             final boolean useMessageConsumer,
-                             final int numberOfMessages,
-                             final long numberOfBytes,
-                             final int waitOnConsumer,
-                             final long delayDelivery) throws Exception
-   {
-      testChunks(isXA,
-                 rollbackFirstSend,
-                 useStreamOnConsume,
-                 realFiles,
-                 preAck,
-                 sendingBlocking,
-                 testBrowser,
-                 useMessageConsumer,
-                 numberOfMessages,
-                 numberOfBytes,
-                 waitOnConsumer,
-                 delayDelivery,
-                 -1,
-                 10 * 1024);
-   }
-
-   protected void testChunks(final boolean isXA,
-                             final boolean rollbackFirstSend,
-                             final boolean useStreamOnConsume,
-                             final boolean realFiles,
-                             final boolean preAck,
-                             final boolean sendingBlocking,
-                             final boolean testBrowser,
-                             final boolean useMessageConsumer,
-                             final int numberOfMessages,
-                             final long numberOfBytes,
-                             final int waitOnConsumer,
-                             final long delayDelivery,
-                             final int producerWindow,
-                             final int minSize) throws Exception
-   {
-      clearData();
-
-      server = createServer(realFiles);
-      server.start();
-
-      try
-      {
-         ClientSessionFactory sf = createInVMFactory();
-
-         if (sendingBlocking)
-         {
-            sf.setBlockOnNonPersistentSend(true);
-            sf.setBlockOnPersistentSend(true);
-            sf.setBlockOnAcknowledge(true);
-         }
-
-         if (producerWindow > 0)
-         {
-            sf.setProducerWindowSize(producerWindow);
-         }
-
-         sf.setMinLargeMessageSize(minSize);
-
-         ClientSession session;
-
-         Xid xid = null;
-         session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
-         if (isXA)
-         {
-            xid = newXID();
-            session.start(xid, XAResource.TMNOFLAGS);
-         }
-
-         session.createQueue(ADDRESS, ADDRESS, null, true);
-
-         ClientProducer producer = session.createProducer(ADDRESS);
-
-         if (rollbackFirstSend)
-         {
-            sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
-
-            if (isXA)
-            {
-               session.end(xid, XAResource.TMSUCCESS);
-               session.rollback(xid);
-               xid = newXID();
-               session.start(xid, XAResource.TMNOFLAGS);
-            }
-            else
-            {
-               session.rollback();
-            }
-
-            validateNoFilesOnLargeDir();
-         }
-
-         sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
-
-         if (isXA)
-         {
-            session.end(xid, XAResource.TMSUCCESS);
-            session.commit(xid, true);
-            xid = newXID();
-            session.start(xid, XAResource.TMNOFLAGS);
-         }
-         else
-         {
-            session.commit();
-         }
-
-         session.close();
-
-         if (realFiles)
-         {
-            server.stop();
-
-            server = createServer(realFiles);
-            server.start();
-
-            sf = createInVMFactory();
-         }
-
-         session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
-         if (isXA)
-         {
-            xid = newXID();
-            session.start(xid, XAResource.TMNOFLAGS);
-         }
-
-         ClientConsumer consumer = null;
-
-         for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
-         {
-
-            System.out.println("Iteration: " + iteration);
-
-            session.stop();
-
-            // first time with a browser
-            consumer = session.createConsumer(ADDRESS, null, iteration == 0);
-
-            if (useMessageConsumer)
-            {
-               final CountDownLatch latchDone = new CountDownLatch(numberOfMessages);
-               final AtomicInteger errors = new AtomicInteger(0);
-
-               MessageHandler handler = new MessageHandler()
-               {
-                  int msgCounter;
-
-                  public void onMessage(final ClientMessage message)
-                  {
-
-                     try
-                     {
-                        System.out.println("Message on consumer: " + msgCounter);
-
-                        if (delayDelivery > 0)
-                        {
-                           long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
-                           assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
-                                      System.currentTimeMillis() - originalTime >= delayDelivery);
-                        }
-
-                        if (!preAck)
-                        {
-                           message.acknowledge();
-                        }
-
-                        assertNotNull(message);
-
-                        if (delayDelivery <= 0)
-                        {
-                           // right now there is no guarantee of ordered delivered on multiple scheduledMessages with
-                           // the same
-                           // scheduled delivery time
-                           assertEquals(msgCounter,
-                                        ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
-                        }
-
-                        if (useStreamOnConsume)
-                        {
-                           final AtomicLong bytesRead = new AtomicLong(0);
-                           message.saveToOutputStream(new OutputStream()
-                           {
-
-                              public void write(byte b[]) throws IOException
-                              {
-                                 if (b[0] == getSamplebyte(bytesRead.get()))
-                                 {
-                                    bytesRead.addAndGet(b.length);
-                                    System.out.println("Read position " + bytesRead.get() + " on consumer");
-                                    if (bytesRead.get() == 1126400l)
-                                    {
-                                       System.out.println("I'm here");
-                                    }
-                                 }
-                                 else
-                                 {
-                                    System.out.println("Received invalid packet at position " + bytesRead.get());
-                                 }
-                              }
-
-                              @Override
-                              public void write(int b) throws IOException
-                              {
-                                 if (b == getSamplebyte(bytesRead.get()))
-                                 {
-                                    bytesRead.incrementAndGet();
-                                 }
-                                 else
-                                 {
-                                    System.out.println("byte not as expected!");
-                                 }
-                              }
-                           });
-
-                           assertEquals(numberOfBytes, bytesRead.get());
-                        }
-                        else
-                        {
-
-                           MessagingBuffer buffer = message.getBody();
-                           buffer.resetReaderIndex();
-                           assertEquals(numberOfBytes, buffer.writerIndex());
-                           for (long b = 0; b < numberOfBytes; b++)
-                           {
-                              if (b % (1024l * 1024l) == 0)
-                              {
-                                 System.out.println("Read " + b + " bytes");
-                              }
-                              
-                              assertEquals(getSamplebyte(b), buffer.readByte());
-                           }
-                        }
-                     }
-                     catch (Throwable e)
-                     {
-                        e.printStackTrace();
-                        System.out.println("Got an error");
-                        errors.incrementAndGet();
-                     }
-                     finally
-                     {
-                        latchDone.countDown();
-                        msgCounter++;
-                     }
-                  }
-               };
-
-               session.start();
-
-               consumer.setMessageHandler(handler);
-
-               assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
-               assertEquals(0, errors.get());
-
-            }
-            else
-            {
-
-               session.start();
-
-               for (int i = 0; i < numberOfMessages; i++)
-               {
-                  System.currentTimeMillis();
-
-                  ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
-
-                  assertNotNull(message);
-
-                  System.out.println("Message: " + i);
-
-                  System.currentTimeMillis();
-
-                  if (delayDelivery > 0)
-                  {
-                     long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
-                     assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
-                                System.currentTimeMillis() - originalTime >= delayDelivery);
-                  }
-
-                  if (!preAck)
-                  {
-                     message.acknowledge();
-                  }
-
-                  assertNotNull(message);
-
-                  if (delayDelivery <= 0)
-                  {
-                     // right now there is no guarantee of ordered delivered on multiple scheduledMessages with the same
-                     // scheduled delivery time
-                     assertEquals(i, ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
-                  }
-
-                  MessagingBuffer buffer = message.getBody();
-                  buffer.resetReaderIndex();
-
-                  if (useStreamOnConsume)
-                  {
-                     final AtomicLong bytesRead = new AtomicLong(0);
-                     message.saveToOutputStream(new OutputStream()
-                     {
-
-                        public void write(byte b[]) throws IOException
-                        {
-                           if (b[0] == getSamplebyte(bytesRead.get()))
-                           {
-                              bytesRead.addAndGet(b.length);
-                           }
-                           else
-                           {
-                              System.out.println("Received invalid packet at position " + bytesRead.get());
-                           }
-
-                        }
-
-                        @Override
-                        public void write(int b) throws IOException
-                        {
-                           if (bytesRead.get() % (1024l * 1024l) == 0)
-                           {
-                              System.out.println("Read " + bytesRead.get() + " bytes");
-                           }
-                           if (b == (byte)'a')
-                           {
-                              bytesRead.incrementAndGet();
-                           }
-                           else
-                           {
-                              System.out.println("byte not as expected!");
-                           }
-                        }
-                     });
-
-                     assertEquals(numberOfBytes, bytesRead.get());
-                  }
-                  else
-                  {
-                     for (long b = 0; b < numberOfBytes; b++)
-                     {
-                        if (b % (1024l * 1024l) == 0l)
-                        {
-                           System.out.println("Read " + b + " bytes");
-                        }
-                        assertEquals(getSamplebyte(b), buffer.readByte());
-                     }
-                  }
-
-               }
-
-            }
-            consumer.close();
-
-            if (iteration == 0)
-            {
-               if (isXA)
-               {
-                  session.end(xid, XAResource.TMSUCCESS);
-                  session.rollback(xid);
-                  xid = newXID();
-                  session.start(xid, XAResource.TMNOFLAGS);
-               }
-               else
-               {
-                  session.rollback();
-               }
-            }
-            else
-            {
-               if (isXA)
-               {
-                  session.end(xid, XAResource.TMSUCCESS);
-                  session.commit(xid, true);
-                  xid = newXID();
-                  session.start(xid, XAResource.TMNOFLAGS);
-               }
-               else
-               {
-                  session.commit();
-               }
-            }
-         }
-
-         session.close();
-
-         long globalSize = server.getPostOffice().getPagingManager().getGlobalSize();
-         assertEquals(0l, globalSize);
-         assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
-         assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
-
-         validateNoFilesOnLargeDir();
-
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-   }
-
-   /**
-    * @param useFile
-    * @param numberOfMessages
-    * @param numberOfIntegers
-    * @param delayDelivery
-    * @param testTime
-    * @param session
-    * @param producer
-    * @throws FileNotFoundException
-    * @throws IOException
-    * @throws MessagingException
-    */
-   private void sendMessages(final int numberOfMessages,
-                             final long numberOfBytes,
-                             final long delayDelivery,
-                             final ClientSession session,
-                             final ClientProducer producer) throws Exception
-   {
-      System.out.println("NumberOfBytes = " + numberOfBytes);
-      for (int i = 0; i < numberOfMessages; i++)
-      {
-         ClientMessage message = session.createClientMessage(true);
-
-         // If the test is using more than 1M, we will only use the Streaming, as it require too much memory from the
-         // test
-         if (numberOfBytes > 1024 * 1024 || i % 2 == 0)
-         {
-            System.out.println("Sending message (stream)" + i);
-            message.setBodyInputStream(createFakeLargeStream(numberOfBytes));
-         }
-         else
-         {
-            System.out.println("Sending message (array)" + i);
-            byte[] bytes = new byte[(int)numberOfBytes];
-            for (int j = 0; j < bytes.length; j++)
-            {
-               bytes[j] = getSamplebyte(j);
-            }
-            message.getBody().writeBytes(bytes);
-         }
-         message.putIntProperty(new SimpleString("counter-message"), i);
-         if (delayDelivery > 0)
-         {
-            long time = System.currentTimeMillis();
-            message.putLongProperty(new SimpleString("original-time"), time);
-            message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
-
-            producer.send(message);
-         }
-         else
-         {
-            producer.send(message);
-         }
-      }
-   }
-
-   protected MessagingBuffer createLargeBuffer(final int numberOfIntegers)
-   {
-      MessagingBuffer body = ChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
-
-      for (int i = 0; i < numberOfIntegers; i++)
-      {
-         body.writeInt(i);
-      }
-
-      return body;
-
-   }
-
-   protected ClientMessage createLargeClientMessage(final ClientSession session, final int numberOfBytes) throws Exception
-   {
-      return createLargeClientMessage(session, numberOfBytes, true);
-   }
-
-   protected ClientMessage createLargeClientMessage(final ClientSession session,
-                                                    final long numberOfBytes,
-                                                    final boolean persistent) throws Exception
-   {
-
-      ClientMessage clientMessage = session.createClientMessage(persistent);
-
-      clientMessage.setBodyInputStream(createFakeLargeStream(numberOfBytes));
-
-      return clientMessage;
-   }
-
-   /**
-    * @param session
-    * @param queueToRead
-    * @param numberOfIntegers
-    * @throws MessagingException
-    * @throws FileNotFoundException
-    * @throws IOException
-    */
-   protected void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfBytes) throws MessagingException,
-                                                                                                                   FileNotFoundException,
-                                                                                                                   IOException
-   {
-      session.start();
-
-      ClientConsumer consumer = session.createConsumer(queueToRead);
-
-      ClientMessage clientMessage = consumer.receive(5000);
-
-      assertNotNull(clientMessage);
-
-      clientMessage.acknowledge();
-
-      session.commit();
-
-      consumer.close();
-   }
-
-   /**
-    * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
-    */
-   protected void validateNoFilesOnLargeDir() throws Exception
-   {
-      File largeMessagesFileDir = new File(getLargeMessagesDir());
-
-      // Deleting the file is async... we keep looking for a period of the time until the file is really gone
-      for (int i = 0; i < 100; i++)
-      {
-         if (largeMessagesFileDir.listFiles().length > 0)
-         {
-            Thread.sleep(10);
-         }
-         else
-         {
-            break;
-         }
-      }
-
-      assertEquals(0, largeMessagesFileDir.listFiles().length);
-   }
-
-   protected OutputStream createFakeOutputStream() throws Exception
-   {
-
-      return new OutputStream()
-      {
-         private boolean closed = false;
-
-         private int count;
-
-         @Override
-         public void close() throws IOException
-         {
-            super.close();
-            closed = true;
-         }
-
-         @Override
-         public void write(final int b) throws IOException
-         {
-            if (count++ % 1024 * 1024 == 0)
-            {
-               System.out.println("OutputStream received " + count + " bytes");
-            }
-            if (closed)
-            {
-               throw new IOException("Stream was closed");
-            }
-         }
-
-      };
-
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java	2009-04-30 16:45:28 UTC (rev 6638)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java	2009-04-30 17:09:51 UTC (rev 6639)
@@ -43,7 +43,7 @@
 import org.jboss.messaging.core.server.MessagingServer;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.integration.chunkmessage.LargeMessageTestBase;
+import org.jboss.messaging.tests.integration.largemessage.LargeMessageTestBase;
 import org.jboss.messaging.utils.DataConstants;
 import org.jboss.messaging.utils.SimpleString;
 
@@ -80,12 +80,12 @@
    {
       internalTestResendMessage(50000);
    }
-   
+
    public void testResendLargeStreamMessage() throws Exception
    {
       internalTestResendMessage(150 * 1024);
    }
-   
+
    public void internalTestResendMessage(long messageSize) throws Exception
    {
       ClientSession session = null;
@@ -101,7 +101,7 @@
          session = sf.createSession(false, false, false);
 
          session.createQueue(ADDRESS, ADDRESS, true);
-         
+
          SimpleString ADDRESS2 = ADDRESS.concat("-2");
 
          session.createQueue(ADDRESS2, ADDRESS2, true);
@@ -115,18 +115,17 @@
          producer.send(clientFile);
 
          session.commit();
-         
+
          session.start();
-         
+
          ClientConsumer consumer = session.createConsumer(ADDRESS);
          ClientConsumer consumer2 = session.createConsumer(ADDRESS2);
-         
+
          ClientMessage msg1 = consumer.receive(10000);
          msg1.acknowledge();
 
          producer2.send(msg1);
-         
-         
+
          try
          {
             producer2.send(msg1);
@@ -137,23 +136,22 @@
          }
 
          session.commit();
-         
+
          ClientMessage msg2 = consumer2.receive(10000);
-         
+
          assertNotNull(msg2);
-         
+
          msg2.acknowledge();
-         
+
          session.commit();
-         
+
          assertEquals(messageSize, msg2.getBodySize());
-         
-         
-         for (int i = 0 ; i < messageSize; i++)
+
+         for (int i = 0; i < messageSize; i++)
          {
             assertEquals(getSamplebyte(i), msg2.getBody().readByte());
          }
-         
+
          session.close();
 
          validateNoFilesOnLargeDir();
@@ -177,9 +175,23 @@
          }
       }
    }
+
    public void testFilePersistenceOneHugeMessage() throws Exception
    {
-      testChunks(false, false, true, true, false, false, false, false, 1, 100 * 1024l * 1024l, RECEIVE_WAIT_TIME, 0, 10 * 1024 * 1024, 1024 * 1024);
+      testChunks(false,
+                 false,
+                 true,
+                 true,
+                 false,
+                 false,
+                 false,
+                 false,
+                 1,
+                 100 * 1024l * 1024l,
+                 RECEIVE_WAIT_TIME,
+                 0,
+                 10 * 1024 * 1024,
+                 1024 * 1024);
    }
 
    public void testFilePersistenceOneMessageStreaming() throws Exception
@@ -194,7 +206,20 @@
 
    public void testFilePersistenceOneHugeMessageConsumer() throws Exception
    {
-      testChunks(false, false, true, true, false, false, false, true, 1, 100 * 1024 * 1024, 120000, 0, 10 * 1024 * 1024, 1024 * 1024);
+      testChunks(false,
+                 false,
+                 true,
+                 true,
+                 false,
+                 false,
+                 false,
+                 true,
+                 1,
+                 100 * 1024 * 1024,
+                 120000,
+                 0,
+                 10 * 1024 * 1024,
+                 1024 * 1024);
    }
 
    public void testFilePersistence() throws Exception
@@ -890,6 +915,109 @@
       }
    }
 
+   public void testReceiveMultipleMessages() throws Exception
+   {
+      ClientSession session = null;
+      MessagingServer server = null;
+
+      final int SIZE = 10 * 1024;
+      final int NUMBER_OF_MESSAGES = 1000;
+      try
+      {
+
+         server = createServer(true);
+
+         server.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setMinLargeMessageSize(1024);
+         sf.setConsumerWindowSize(1024 * 1024);
+
+         session = sf.createSession(null, null, false, false, false, false, 0);
+
+         session.createQueue(ADDRESS, ADDRESS, null, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+         {
+            Message clientFile = session.createClientMessage(true);
+            clientFile.setBodyInputStream(createFakeLargeStream(SIZE));
+            producer.send(clientFile);
+
+         }
+         session.commit();
+         producer.close();
+
+         session.start();
+
+         // Reads the messages, rollback.. read them again
+         for (int trans = 0; trans < 2; trans++)
+         {
+
+            ClientConsumerInternal consumer = (ClientConsumerInternal)session.createConsumer(ADDRESS);
+
+            // Wait the consumer to be complete with 10 messages before getting others
+            long timeout = System.currentTimeMillis() + 10000;
+            while (consumer.getBufferSize() < 10 && timeout > System.currentTimeMillis())
+            {
+               Thread.sleep(10);
+            }
+
+            for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+            {
+               ClientMessage msg = consumer.receive(10000);
+               assertNotNull(msg);
+
+               // it will ignore the buffer (not read it) on the first try
+               if (trans == 0)
+               {
+                  for (int byteRead = 0; byteRead < SIZE; byteRead++)
+                  {
+                     assertEquals(getSamplebyte(byteRead), msg.getBody().readByte());
+                  }
+               }
+
+               msg.acknowledge();
+            }
+            if (trans == 0)
+            {
+               session.rollback();
+            }
+            else
+            {
+               session.commit();
+            }
+
+            consumer.close();
+         }
+
+         assertEquals(0l, server.getPostOffice().getPagingManager().getGlobalSize());
+         assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+         assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+      }
+      finally
+      {
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    public void testSendStreamingSingleMessage() throws Exception
    {
       ClientSession session = null;

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageCleanupTest.java (from rev 6637, trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageCleanupTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageCleanupTest.java	2009-04-30 17:09:51 UTC (rev 6639)
@@ -0,0 +1,207 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.largemessage;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.integration.largemessage.mock.MockConnector;
+import org.jboss.messaging.tests.integration.largemessage.mock.MockConnectorFactory;
+
+/**
+ * A LargeMessageCleanupTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class LargeMessageCleanupTest extends LargeMessageTestBase
+{
+   // Constants -----------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(LargeMessageCleanupTest.class);
+
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   
+   public void testCleanup() throws Exception
+   {
+      clearData();
+      
+      FileOutputStream fileOut = new FileOutputStream(new File(getLargeMessagesDir(), "1234.tmp"));
+      
+      fileOut.write(new byte[1024]); // anything
+      
+      fileOut.close();
+
+      Configuration config = createDefaultConfig();
+
+      server = createServer(true, config, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      try
+      {
+
+         File directoryLarge = new File(getLargeMessagesDir());
+
+         assertEquals("The startup should have been deleted 1234.tmp", 0, directoryLarge.list().length);
+      }
+      finally
+      {
+         server.stop();
+      }
+   }
+
+   public void testFailureOnSendingFile() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setPagingMaxGlobalSizeBytes(20 * 1024);
+      config.setPagingGlobalWatermarkSize(10 * 1024);
+
+      server = createServer(true, config, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int numberOfBytes = 2 * 1024 * 1024;
+
+      ClientSession session = null;
+
+      class LocalCallback implements MockConnector.MockCallback
+      {
+         AtomicInteger counter = new AtomicInteger(0);
+
+         ClientSession session;
+
+         public void onWrite(final MessagingBuffer buffer)
+         {
+            log.info("calling cb onwrite** ");
+            if (counter.incrementAndGet() == 5)
+            {
+               RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+               RemotingServiceImpl remotingServiceImpl = (RemotingServiceImpl)server.getRemotingService();
+               remotingServiceImpl.connectionException(conn.getID(),
+                                                       new MessagingException(MessagingException.NOT_CONNECTED, "blah!"));
+               conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+               throw new IllegalStateException("blah");
+            }
+         }
+      }
+
+      LocalCallback callback = new LocalCallback();
+
+      try
+      {
+         HashMap<String, Object> parameters = new HashMap<String, Object>();
+         parameters.put("callback", callback);
+
+         TransportConfiguration transport = new TransportConfiguration(MockConnectorFactory.class.getCanonicalName(),
+                                                                       parameters);
+
+         ClientSessionFactory mockFactory = new ClientSessionFactoryImpl(transport);
+
+         mockFactory.setBlockOnNonPersistentSend(false);
+         mockFactory.setBlockOnPersistentSend(false);
+         mockFactory.setBlockOnAcknowledge(false);
+
+         session = mockFactory.createSession(null, null, false, true, true, false, 0);
+
+         callback.session = session;
+
+         session.createQueue(ADDRESS, ADDRESS, null, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         ClientMessage clientLarge = createLargeClientMessage(session, numberOfBytes);
+
+         try
+         {
+            producer.send(clientLarge);
+            
+            fail("Exception was expected!");
+         }
+         catch (Exception e)
+         {
+         }
+
+         validateNoFilesOnLargeDir();
+
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Exception ignored)
+         {
+            ignored.printStackTrace();
+         }
+      }
+
+   }
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java (from rev 6637, trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java	2009-04-30 17:09:51 UTC (rev 6639)
@@ -0,0 +1,670 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.largemessage;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A LargeMessageTestBase
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Oct 29, 2008 11:43:52 AM
+ *
+ *
+ */
+public class LargeMessageTestBase extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+   private static final Logger log = Logger.getLogger(LargeMessageTestBase.class);
+
+   protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+   protected MessagingServer server;
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void testChunks(final boolean isXA,
+                             final boolean rollbackFirstSend,
+                             final boolean useStreamOnConsume,
+                             final boolean realFiles,
+                             final boolean preAck,
+                             final boolean sendingBlocking,
+                             final boolean testBrowser,
+                             final boolean useMessageConsumer,
+                             final int numberOfMessages,
+                             final long numberOfBytes,
+                             final int waitOnConsumer,
+                             final long delayDelivery) throws Exception
+   {
+      testChunks(isXA,
+                 rollbackFirstSend,
+                 useStreamOnConsume,
+                 realFiles,
+                 preAck,
+                 sendingBlocking,
+                 testBrowser,
+                 useMessageConsumer,
+                 numberOfMessages,
+                 numberOfBytes,
+                 waitOnConsumer,
+                 delayDelivery,
+                 -1,
+                 10 * 1024);
+   }
+
+   protected void testChunks(final boolean isXA,
+                             final boolean rollbackFirstSend,
+                             final boolean useStreamOnConsume,
+                             final boolean realFiles,
+                             final boolean preAck,
+                             final boolean sendingBlocking,
+                             final boolean testBrowser,
+                             final boolean useMessageConsumer,
+                             final int numberOfMessages,
+                             final long numberOfBytes,
+                             final int waitOnConsumer,
+                             final long delayDelivery,
+                             final int producerWindow,
+                             final int minSize) throws Exception
+   {
+      clearData();
+
+      server = createServer(realFiles);
+      server.start();
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         if (sendingBlocking)
+         {
+            sf.setBlockOnNonPersistentSend(true);
+            sf.setBlockOnPersistentSend(true);
+            sf.setBlockOnAcknowledge(true);
+         }
+
+         if (producerWindow > 0)
+         {
+            sf.setProducerWindowSize(producerWindow);
+         }
+
+         sf.setMinLargeMessageSize(minSize);
+
+         ClientSession session;
+
+         Xid xid = null;
+         session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+         if (isXA)
+         {
+            xid = newXID();
+            session.start(xid, XAResource.TMNOFLAGS);
+         }
+
+         session.createQueue(ADDRESS, ADDRESS, null, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         if (rollbackFirstSend)
+         {
+            sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
+
+            if (isXA)
+            {
+               session.end(xid, XAResource.TMSUCCESS);
+               session.rollback(xid);
+               xid = newXID();
+               session.start(xid, XAResource.TMNOFLAGS);
+            }
+            else
+            {
+               session.rollback();
+            }
+
+            validateNoFilesOnLargeDir();
+         }
+
+         sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
+
+         if (isXA)
+         {
+            session.end(xid, XAResource.TMSUCCESS);
+            session.commit(xid, true);
+            xid = newXID();
+            session.start(xid, XAResource.TMNOFLAGS);
+         }
+         else
+         {
+            session.commit();
+         }
+
+         session.close();
+
+         if (realFiles)
+         {
+            server.stop();
+
+            server = createServer(realFiles);
+            server.start();
+
+            sf = createInVMFactory();
+         }
+
+         session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+         if (isXA)
+         {
+            xid = newXID();
+            session.start(xid, XAResource.TMNOFLAGS);
+         }
+
+         ClientConsumer consumer = null;
+
+         for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
+         {
+
+            System.out.println("Iteration: " + iteration);
+
+            session.stop();
+
+            // first time with a browser
+            consumer = session.createConsumer(ADDRESS, null, iteration == 0);
+
+            if (useMessageConsumer)
+            {
+               final CountDownLatch latchDone = new CountDownLatch(numberOfMessages);
+               final AtomicInteger errors = new AtomicInteger(0);
+
+               MessageHandler handler = new MessageHandler()
+               {
+                  int msgCounter;
+
+                  public void onMessage(final ClientMessage message)
+                  {
+
+                     try
+                     {
+                        System.out.println("Message on consumer: " + msgCounter);
+
+                        if (delayDelivery > 0)
+                        {
+                           long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
+                           assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
+                                      System.currentTimeMillis() - originalTime >= delayDelivery);
+                        }
+
+                        if (!preAck)
+                        {
+                           message.acknowledge();
+                        }
+
+                        assertNotNull(message);
+
+                        if (delayDelivery <= 0)
+                        {
+                           // right now there is no guarantee of ordered delivered on multiple scheduledMessages with
+                           // the same
+                           // scheduled delivery time
+                           assertEquals(msgCounter,
+                                        ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
+                        }
+
+                        if (useStreamOnConsume)
+                        {
+                           final AtomicLong bytesRead = new AtomicLong(0);
+                           message.saveToOutputStream(new OutputStream()
+                           {
+
+                              public void write(byte b[]) throws IOException
+                              {
+                                 if (b[0] == getSamplebyte(bytesRead.get()))
+                                 {
+                                    bytesRead.addAndGet(b.length);
+                                    System.out.println("Read position " + bytesRead.get() + " on consumer");
+                                    if (bytesRead.get() == 1126400l)
+                                    {
+                                       System.out.println("I'm here");
+                                    }
+                                 }
+                                 else
+                                 {
+                                    System.out.println("Received invalid packet at position " + bytesRead.get());
+                                 }
+                              }
+
+                              @Override
+                              public void write(int b) throws IOException
+                              {
+                                 if (b == getSamplebyte(bytesRead.get()))
+                                 {
+                                    bytesRead.incrementAndGet();
+                                 }
+                                 else
+                                 {
+                                    System.out.println("byte not as expected!");
+                                 }
+                              }
+                           });
+
+                           assertEquals(numberOfBytes, bytesRead.get());
+                        }
+                        else
+                        {
+
+                           MessagingBuffer buffer = message.getBody();
+                           buffer.resetReaderIndex();
+                           assertEquals(numberOfBytes, buffer.writerIndex());
+                           for (long b = 0; b < numberOfBytes; b++)
+                           {
+                              if (b % (1024l * 1024l) == 0)
+                              {
+                                 System.out.println("Read " + b + " bytes");
+                              }
+                              
+                              assertEquals(getSamplebyte(b), buffer.readByte());
+                           }
+                        }
+                     }
+                     catch (Throwable e)
+                     {
+                        e.printStackTrace();
+                        System.out.println("Got an error");
+                        errors.incrementAndGet();
+                     }
+                     finally
+                     {
+                        latchDone.countDown();
+                        msgCounter++;
+                     }
+                  }
+               };
+
+               session.start();
+
+               consumer.setMessageHandler(handler);
+
+               assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
+               assertEquals(0, errors.get());
+
+            }
+            else
+            {
+
+               session.start();
+
+               for (int i = 0; i < numberOfMessages; i++)
+               {
+                  System.currentTimeMillis();
+
+                  ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
+
+                  assertNotNull(message);
+
+                  System.out.println("Message: " + i);
+
+                  System.currentTimeMillis();
+
+                  if (delayDelivery > 0)
+                  {
+                     long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
+                     assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
+                                System.currentTimeMillis() - originalTime >= delayDelivery);
+                  }
+
+                  if (!preAck)
+                  {
+                     message.acknowledge();
+                  }
+
+                  assertNotNull(message);
+
+                  if (delayDelivery <= 0)
+                  {
+                     // right now there is no guarantee of ordered delivered on multiple scheduledMessages with the same
+                     // scheduled delivery time
+                     assertEquals(i, ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
+                  }
+
+                  MessagingBuffer buffer = message.getBody();
+                  buffer.resetReaderIndex();
+
+                  if (useStreamOnConsume)
+                  {
+                     final AtomicLong bytesRead = new AtomicLong(0);
+                     message.saveToOutputStream(new OutputStream()
+                     {
+
+                        public void write(byte b[]) throws IOException
+                        {
+                           if (b[0] == getSamplebyte(bytesRead.get()))
+                           {
+                              bytesRead.addAndGet(b.length);
+                           }
+                           else
+                           {
+                              System.out.println("Received invalid packet at position " + bytesRead.get());
+                           }
+
+                        }
+
+                        @Override
+                        public void write(int b) throws IOException
+                        {
+                           if (bytesRead.get() % (1024l * 1024l) == 0)
+                           {
+                              System.out.println("Read " + bytesRead.get() + " bytes");
+                           }
+                           if (b == (byte)'a')
+                           {
+                              bytesRead.incrementAndGet();
+                           }
+                           else
+                           {
+                              System.out.println("byte not as expected!");
+                           }
+                        }
+                     });
+
+                     assertEquals(numberOfBytes, bytesRead.get());
+                  }
+                  else
+                  {
+                     for (long b = 0; b < numberOfBytes; b++)
+                     {
+                        if (b % (1024l * 1024l) == 0l)
+                        {
+                           System.out.println("Read " + b + " bytes");
+                        }
+                        assertEquals(getSamplebyte(b), buffer.readByte());
+                     }
+                  }
+
+               }
+
+            }
+            consumer.close();
+
+            if (iteration == 0)
+            {
+               if (isXA)
+               {
+                  session.end(xid, XAResource.TMSUCCESS);
+                  session.rollback(xid);
+                  xid = newXID();
+                  session.start(xid, XAResource.TMNOFLAGS);
+               }
+               else
+               {
+                  session.rollback();
+               }
+            }
+            else
+            {
+               if (isXA)
+               {
+                  session.end(xid, XAResource.TMSUCCESS);
+                  session.commit(xid, true);
+                  xid = newXID();
+                  session.start(xid, XAResource.TMNOFLAGS);
+               }
+               else
+               {
+                  session.commit();
+               }
+            }
+         }
+
+         session.close();
+
+         long globalSize = server.getPostOffice().getPagingManager().getGlobalSize();
+         assertEquals(0l, globalSize);
+         assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+         assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+         validateNoFilesOnLargeDir();
+
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   /**
+    * @param useFile
+    * @param numberOfMessages
+    * @param numberOfIntegers
+    * @param delayDelivery
+    * @param testTime
+    * @param session
+    * @param producer
+    * @throws FileNotFoundException
+    * @throws IOException
+    * @throws MessagingException
+    */
+   private void sendMessages(final int numberOfMessages,
+                             final long numberOfBytes,
+                             final long delayDelivery,
+                             final ClientSession session,
+                             final ClientProducer producer) throws Exception
+   {
+      System.out.println("NumberOfBytes = " + numberOfBytes);
+      for (int i = 0; i < numberOfMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(true);
+
+         // If the test is using more than 1M, we will only use the Streaming, as it require too much memory from the
+         // test
+         if (numberOfBytes > 1024 * 1024 || i % 2 == 0)
+         {
+            System.out.println("Sending message (stream)" + i);
+            message.setBodyInputStream(createFakeLargeStream(numberOfBytes));
+         }
+         else
+         {
+            System.out.println("Sending message (array)" + i);
+            byte[] bytes = new byte[(int)numberOfBytes];
+            for (int j = 0; j < bytes.length; j++)
+            {
+               bytes[j] = getSamplebyte(j);
+            }
+            message.getBody().writeBytes(bytes);
+         }
+         message.putIntProperty(new SimpleString("counter-message"), i);
+         if (delayDelivery > 0)
+         {
+            long time = System.currentTimeMillis();
+            message.putLongProperty(new SimpleString("original-time"), time);
+            message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
+
+            producer.send(message);
+         }
+         else
+         {
+            producer.send(message);
+         }
+      }
+   }
+
+   protected MessagingBuffer createLargeBuffer(final int numberOfIntegers)
+   {
+      MessagingBuffer body = ChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
+
+      for (int i = 0; i < numberOfIntegers; i++)
+      {
+         body.writeInt(i);
+      }
+
+      return body;
+
+   }
+
+   protected ClientMessage createLargeClientMessage(final ClientSession session, final int numberOfBytes) throws Exception
+   {
+      return createLargeClientMessage(session, numberOfBytes, true);
+   }
+
+   protected ClientMessage createLargeClientMessage(final ClientSession session,
+                                                    final long numberOfBytes,
+                                                    final boolean persistent) throws Exception
+   {
+
+      ClientMessage clientMessage = session.createClientMessage(persistent);
+
+      clientMessage.setBodyInputStream(createFakeLargeStream(numberOfBytes));
+
+      return clientMessage;
+   }
+
+   /**
+    * @param session
+    * @param queueToRead
+    * @param numberOfIntegers
+    * @throws MessagingException
+    * @throws FileNotFoundException
+    * @throws IOException
+    */
+   protected void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfBytes) throws MessagingException,
+                                                                                                                   FileNotFoundException,
+                                                                                                                   IOException
+   {
+      session.start();
+
+      ClientConsumer consumer = session.createConsumer(queueToRead);
+
+      ClientMessage clientMessage = consumer.receive(5000);
+
+      assertNotNull(clientMessage);
+
+      clientMessage.acknowledge();
+
+      session.commit();
+
+      consumer.close();
+   }
+
+   /**
+    * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
+    */
+   protected void validateNoFilesOnLargeDir() throws Exception
+   {
+      File largeMessagesFileDir = new File(getLargeMessagesDir());
+
+      // Deleting the file is async... we keep looking for a period of the time until the file is really gone
+      for (int i = 0; i < 100; i++)
+      {
+         if (largeMessagesFileDir.listFiles().length > 0)
+         {
+            Thread.sleep(10);
+         }
+         else
+         {
+            break;
+         }
+      }
+
+      assertEquals(0, largeMessagesFileDir.listFiles().length);
+   }
+
+   protected OutputStream createFakeOutputStream() throws Exception
+   {
+
+      return new OutputStream()
+      {
+         private boolean closed = false;
+
+         private int count;
+
+         @Override
+         public void close() throws IOException
+         {
+            super.close();
+            closed = true;
+         }
+
+         @Override
+         public void write(final int b) throws IOException
+         {
+            if (count++ % 1024 * 1024 == 0)
+            {
+               //System.out.println("OutputStream received " + count + " bytes");
+            }
+            if (closed)
+            {
+               throw new IOException("Stream was closed");
+            }
+         }
+
+      };
+
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock (from rev 6637, trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock)

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnector.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/MockConnector.java	2009-04-30 16:08:56 UTC (rev 6637)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnector.java	2009-04-30 17:09:51 UTC (rev 6639)
@@ -20,7 +20,7 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-package org.jboss.messaging.tests.integration.chunkmessage.mock;
+package org.jboss.messaging.tests.integration.largemessage.mock;
 
 import java.util.Map;
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnectorFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/MockConnectorFactory.java	2009-04-30 16:08:56 UTC (rev 6637)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnectorFactory.java	2009-04-30 17:09:51 UTC (rev 6639)
@@ -20,7 +20,7 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-package org.jboss.messaging.tests.integration.chunkmessage.mock;
+package org.jboss.messaging.tests.integration.largemessage.mock;
 
 import java.util.Map;
 

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/chunk/LargeMessageStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/chunk/LargeMessageStressTest.java	2009-04-30 16:45:28 UTC (rev 6638)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/chunk/LargeMessageStressTest.java	2009-04-30 17:09:51 UTC (rev 6639)
@@ -22,7 +22,7 @@
 
 package org.jboss.messaging.tests.stress.chunk;
 
-import org.jboss.messaging.tests.integration.chunkmessage.LargeMessageTestBase;
+import org.jboss.messaging.tests.integration.largemessage.LargeMessageTestBase;
 
 /**
  * A MessageChunkSoakTest

Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2009-04-30 16:45:28 UTC (rev 6638)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2009-04-30 17:09:51 UTC (rev 6639)
@@ -407,7 +407,6 @@
          public void close() throws IOException
          {
             super.close();
-            System.out.println("Sent " + count + " bytes over fakeOutputStream, while size = " + size);
             closed = true;
          }
 




More information about the jboss-cvs-commits mailing list