[jboss-cvs] JBoss Messaging SVN: r5194 - in branches/Branch_Chunk_CRS2: src/main/org/jboss/messaging/core/journal/impl and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Oct 28 19:58:25 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-10-28 19:58:25 -0400 (Tue, 28 Oct 2008)
New Revision: 5194

Modified:
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/Message.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
   branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
   branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
Dealing with flow control on message chunks

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java	2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java	2008-10-28 23:58:25 UTC (rev 5194)
@@ -57,15 +57,12 @@
    {
       super();
    }
-   
 
-   public FileClientMessageImpl(boolean durable)
+   public FileClientMessageImpl(final boolean durable)
    {
       super(durable, null);
    }
 
-
-
    /**
     * @param type
     * @param durable
@@ -74,32 +71,28 @@
     * @param priority
     * @param body
     */
-   public FileClientMessageImpl(byte type,
-                                boolean durable,
-                                long expiration,
-                                long timestamp,
-                                byte priority,
-                                MessagingBuffer body)
+   public FileClientMessageImpl(final byte type,
+                                final boolean durable,
+                                final long expiration,
+                                final long timestamp,
+                                final byte priority,
+                                final MessagingBuffer body)
    {
       super(type, durable, expiration, timestamp, priority, body);
       // TODO Auto-generated constructor stub
    }
 
-
-
    /**
     * @param type
     * @param durable
     * @param body
     */
-   public FileClientMessageImpl(byte type, boolean durable, MessagingBuffer body)
+   public FileClientMessageImpl(final byte type, final boolean durable, final MessagingBuffer body)
    {
       super(type, durable, body);
       // TODO Auto-generated constructor stub
    }
 
-
-
    /**
     * @param deliveryCount
     */
@@ -167,27 +160,27 @@
          }
       }
    }
-   
-   public synchronized void encodeBody(MessagingBuffer buffer, int start, int size)
+
+   @Override
+   public synchronized void encodeBody(final MessagingBuffer buffer, final long start, final int size)
    {
       try
       {
          FileChannel channel = getChannel();
-         
+
          ByteBuffer bufferRead = ByteBuffer.allocate(size);
-         
+
          channel.position(start);
          channel.read(bufferRead);
-         
+
          buffer.putBytes(bufferRead.array());
       }
       catch (Exception e)
       {
          throw new RuntimeException(e.getMessage(), e);
       }
-      
+
    }
-   
 
    @Override
    public void setBody(final MessagingBuffer body)
@@ -246,13 +239,13 @@
       {
          RandomAccessFile randomFile = new RandomAccessFile(getFile(), "rw");
          randomFile.seek(0);
-   
+
          FileChannel channel = randomFile.getChannel();
          return channel;
       }
       catch (IOException e)
       {
-         throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);  
+         throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
       }
    }
 

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-10-28 23:58:25 UTC (rev 5194)
@@ -40,6 +40,11 @@
    public NIOSequentialFileFactory(final String journalDir)
    {
       super(journalDir);
+      
+      if (journalDir == null)
+      {
+         new Exception ("journalDir is null").printStackTrace();
+      }
    }
 
    // maxIO is ignored on NIO

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/Message.java	2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/Message.java	2008-10-28 23:58:25 UTC (rev 5194)
@@ -87,7 +87,7 @@
    
    
    // Used on Message chunk
-   void encodeBody(MessagingBuffer buffer, int start, int size);
+   void encodeBody(MessagingBuffer buffer, long start, int size);
    
    void encodeBody(MessagingBuffer buffer);
    

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-10-28 23:58:25 UTC (rev 5194)
@@ -193,9 +193,9 @@
    }
    
    // Used on Message chunk
-   public void encodeBody(MessagingBuffer buffer, int start, int size)
+   public void encodeBody(MessagingBuffer buffer, long start, int size)
    {
-      buffer.putBytes(body.array(), start, size);
+      buffer.putBytes(body.array(), (int)start, size);
    }
    
 

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java	2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java	2008-10-28 23:58:25 UTC (rev 5194)
@@ -88,7 +88,7 @@
    }
 
    @Override
-   public synchronized void encodeBody(final MessagingBuffer bufferOut, final int start, final int size)
+   public synchronized void encodeBody(final MessagingBuffer bufferOut, final long start, final int size)
    {
       new Exception ("Encode body");
       validateFile();
@@ -220,6 +220,7 @@
    {
       if (file.isOpen())
       {
+         System.out.println("Closing file " + file);
          try
          {
             file.close();

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-10-28 23:58:25 UTC (rev 5194)
@@ -949,12 +949,15 @@
     */
    private void cleanupIncompleteFiles() throws Exception
    {
-      List<String> tmpFiles = this.largeMessagesFactory.listFiles("tmp");
-      for (String tmpFile: tmpFiles)
+      if (largeMessagesFactory != null)
       {
-         SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
-         System.out.println("cleaning up file " + file);
-         file.delete();
+         List<String> tmpFiles = this.largeMessagesFactory.listFiles("tmp");
+         for (String tmpFile : tmpFiles)
+         {
+            SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
+            System.out.println("cleaning up file " + file);
+            file.delete();
+         }
       }
    }
 

Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-10-28 23:58:25 UTC (rev 5194)
@@ -27,6 +27,7 @@
 import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -91,7 +92,7 @@
 
    private final Lock lock = new ReentrantLock();
 
-   private final Semaphore availableCredits;
+   private final AtomicInteger availableCredits;
 
    private boolean started;
 
@@ -107,6 +108,13 @@
    private final PostOffice postOffice;
 
    private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
+   
+   /** The current message being processed */
+   private volatile ServerLargeMessage pendingLargeMessage;
+   
+   /** The current position on the message being processed */
+   private volatile long positionLargeMessage;
+   
 
    private final Channel channel;
 
@@ -140,7 +148,7 @@
 
       if (enableFlowControl)
       {
-         availableCredits = new Semaphore(0);
+         availableCredits = new AtomicInteger(0);
       }
       else
       {
@@ -170,7 +178,7 @@
 
    public HandleStatus handle(final MessageReference ref) throws Exception
    {
-      if (availableCredits != null && availableCredits.availablePermits() <= 0)
+      if (availableCredits != null && availableCredits.get() <= 0)
       {
          return HandleStatus.BUSY;
       }
@@ -189,6 +197,12 @@
       
       try
       {
+         
+         if (pendingLargeMessage != null)
+         {
+            new Exception("Busy because of pendingLargeMessage").printStackTrace();
+            return HandleStatus.BUSY;
+         }
       
          // If the consumer is stopped then we don't accept the message, it
          // should go back into the
@@ -215,10 +229,13 @@
 
          if (message instanceof ServerLargeMessage)
          {
+// Todo: How to handle large-files on clustering            
 //            if (result == null)
 //            {
                // Not replicated - just send now
-               sendChunks((ServerLargeMessage)message);
+               pendingLargeMessage = (ServerLargeMessage)message;
+               positionLargeMessage = 0;
+               sendChunks();
 //            }
 //            else
 //            {
@@ -245,7 +262,7 @@
    
             if (availableCredits != null)
             {
-               availableCredits.release(message.getEncodeSize());
+               availableCredits.addAndGet(-message.getEncodeSize());
             }
 
             if (result == null)
@@ -369,10 +386,7 @@
    {
       if (availableCredits != null)
       {
-         int previous;
-         
-         previous = availableCredits.availablePermits(); 
-         availableCredits.release(credits);
+         int previous = availableCredits.getAndAdd(credits);
 
          if (previous <= 0 && previous + credits > 0)
          {
@@ -460,59 +474,104 @@
    // Private
    // --------------------------------------------------------------------------------------
 
-   /**
-    * @param message
-    * @throws MessagingException
-    */
-   private void sendChunks(ServerLargeMessage message) throws Exception
+   private boolean sendChunks()
    {
-      final int bodySize = message.getBodySize();
       
-      int chunkLength = 0;
+      lock.lock();
       
-      SessionSendChunkMessage chunk = null;
-      
-      for (int pos = 0; pos < bodySize; pos += chunkLength)
+      try
       {
          
-         if (pos == 0)
+         final long bodySize = pendingLargeMessage.getBodySize();
+   
+         int chunkLength = 0;
+   
+         SessionSendChunkMessage chunk = null;
+   
+         for (; positionLargeMessage < bodySize; positionLargeMessage += chunkLength)
          {
-            int headerSize = message.getPropertiesEncodeSize();
-
-            chunkLength = minLargeMessageSize - headerSize;
-
-            MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(message.getPropertiesEncodeSize()));
-            message.encodeProperties(headerBuffer);
             
-            MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
-            message.encodeBody(bodyBuffer, 0, chunkLength);
-
-            chunk = new SessionSendChunkMessage(id,
-                                                headerBuffer.array(),
-                                                bodyBuffer.array(),
-                                                chunkLength < bodySize,
-                                                false);
+            if (availableCredits.get() <= 0)
+            {
+               System.out.println("Cancelling.. not enough credits");
+               return false;
+            }
+            else
+            {
+               System.out.println("good!!!");
+            }
+   
+            if (positionLargeMessage == 0)
+            {
+               int headerSize = pendingLargeMessage.getPropertiesEncodeSize();
+   
+               chunkLength = minLargeMessageSize - headerSize;
+   
+               MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
+               pendingLargeMessage.encodeProperties(headerBuffer);
+   
+               MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)chunkLength));
+               pendingLargeMessage.encodeBody(bodyBuffer, 0, chunkLength);
+   
+               if (availableCredits != null)
+               {
+                  availableCredits.addAndGet(-chunkLength);
+               }
+   
+               chunk = new SessionSendChunkMessage(id,
+                                                   headerBuffer.array(),
+                                                   bodyBuffer.array(),
+                                                   chunkLength < bodySize,
+                                                   false);
+            }
+            else
+            {
+               chunkLength = (int)Math.min(bodySize - positionLargeMessage, minLargeMessageSize);
+   
+               if (availableCredits != null)
+               {
+                  int leftCredits = availableCredits.addAndGet(-chunkLength);
+//                  if (leftCredits < 0)
+//                  {
+//                     if (chunkLength > 0)
+//                     {
+//                        availableCredits.addAndGet(-leftCredits);
+//                     }
+//                     else
+//                     {
+//                        // sanity check only.. it shouldn't happen
+//                        // This next statement means, we didn't have enough credit to send anything, so we return the credits and give up sending
+//                        availableCredits.addAndGet(chunkLength);
+//                        return false;
+//                     }
+//                  }
+               }
+   
+               MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)chunkLength));
+   
+               pendingLargeMessage.encodeBody(bodyBuffer, positionLargeMessage, chunkLength);
+   
+               chunk = new SessionSendChunkMessage(id,
+                                                   null,
+                                                   bodyBuffer.array(),
+                                                   positionLargeMessage + chunkLength < bodySize,
+                                                   false);
+            }
+   
+            channel.send(chunk);
          }
-         else
-         {
-            chunkLength = Math.min(bodySize - pos, minLargeMessageSize);
-            MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
+   
+         pendingLargeMessage.releaseResources();
+         this.pendingLargeMessage = null;
+         this.positionLargeMessage = -1;
+   
+         return true;
+      }
+      finally
+      {
+         lock.unlock();
+      }
 
-            message.encodeBody(bodyBuffer, pos, chunkLength);
-            
-
-            chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + chunkLength < bodySize, false);
-         }
-
-         if (availableCredits != null)
-         {
-            availableCredits.acquire(chunk.getPacketSize());
-         }
-
-         channel.send(chunk);
-      }
-      
-      message.releaseResources();
    }
    
    private void doClose() throws Exception
@@ -543,7 +602,18 @@
    
    private void promptDelivery()
    {
-      session.promptDelivery(messageQueue);
+      if (pendingLargeMessage != null)
+      {
+         if (sendChunks())
+         {
+            // prompt Delivery only if chunk was finished
+            session.promptDelivery(messageQueue);
+         }
+      }
+      else
+      {
+         session.promptDelivery(messageQueue);
+      }
    }
 
    // Inner classes

Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java	2008-10-28 23:58:25 UTC (rev 5194)
@@ -230,7 +230,7 @@
 
    public void testMessageChunkNullPersistenceDelayed() throws Exception
    {
-      testInternal(false, false, 100, 50000, false, 1000, 100);
+      testInternal(false, false, 100, 50000, false, 10000, 100);
    }
 
    public void testMessageChunkFilePersistence() throws Exception

Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java	2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java	2008-10-28 23:58:25 UTC (rev 5194)
@@ -21,70 +21,51 @@
  */
 package org.jboss.messaging.tests.integration.scheduling;
 
+import java.util.Calendar;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.impl.XidImpl;
 import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.util.id.GUID;
 
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.io.File;
-import java.util.Calendar;
-
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */
-public class ScheduledMessageTest extends UnitTestCase
+public class ScheduledMessageTest extends ServiceTestBase
 {
    private static final Logger log = Logger.getLogger(ScheduledMessageTest.class);
 
    
-   private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
-
-   private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
-
-   private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/journal";
-
-   private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/bindings";
-
-   private String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/page";
-
    private SimpleString atestq = new SimpleString("ascheduledtestq");
 
    private SimpleString atestq2 = new SimpleString("ascheduledtestq2");
 
    private MessagingService messagingService;
 
-   private ConfigurationImpl configuration;
+   private Configuration configuration;
 
    protected void setUp() throws Exception
    {
-      File file = new File(journalDir);
-      File file2 = new File(bindingsDir);
-      File file3 = new File(pageDir);
-      deleteDirectory(file);
-      file.mkdirs();
-      deleteDirectory(file2);
-      file2.mkdirs();
-      deleteDirectory(file3);
-      file3.mkdirs();
-      configuration = new ConfigurationImpl();
+      super.clearData();
+      configuration = createDefaultConfig();
       configuration.setSecurityEnabled(false);
       configuration.setJournalMinFiles(2);
-      configuration.setPagingDirectory(pageDir);
+      configuration.setPagingMaxGlobalSizeBytes(-1);
+      messagingService = createService(true, configuration);
+      messagingService.start();
    }
 
    protected void tearDown() throws Exception
@@ -101,9 +82,7 @@
             // ignore
          }
       }
-      new File(journalDir).delete();
-      new File(bindingsDir).delete();
-      new File(pageDir).delete();
+      clearData();
    }
 
    public void testRecoveredMessageDeliveredCorrectly() throws Exception
@@ -159,14 +138,8 @@
    public void testPagedMessageDeliveredCorrectly() throws Exception
    {
 
-      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      configuration.setPagingMaxGlobalSizeBytes(0);
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
-      // start the server
-      messagingService.start();
       // then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSessionFactory sessionFactory = createInVMFactory(); 
       ClientSession session = sessionFactory.createSession(false, true, false, false);
       session.createQueue(atestq, atestq, null, true, true);
       ClientProducer producer = session.createProducer(atestq);
@@ -197,17 +170,11 @@
 
    public void testPagedMessageDeliveredMultipleConsumersCorrectly() throws Exception
    {
-      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      configuration.setPagingMaxGlobalSizeBytes(0);
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
-      // start the server
-      messagingService.start();
       QueueSettings qs = new QueueSettings();
       qs.setRedeliveryDelay(5000l);
       messagingService.getServer().getQueueSettingsRepository().addMatch(atestq2.toString(), qs);
       // then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSessionFactory sessionFactory = createInVMFactory();
       ClientSession session = sessionFactory.createSession(false, true, false, false);
       session.createQueue(atestq, atestq, null, true, true);
       session.createQueue(atestq, atestq2, null, true, true);
@@ -252,17 +219,11 @@
    public void testPagedMessageDeliveredMultipleConsumersAfterRecoverCorrectly() throws Exception
    {
 
-      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      configuration.setPagingMaxGlobalSizeBytes(0);
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
-      // start the server
-      messagingService.start();
       QueueSettings qs = new QueueSettings();
       qs.setRedeliveryDelay(5000l);
       messagingService.getServer().getQueueSettingsRepository().addMatch(atestq2.toString(), qs);
       // then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSessionFactory sessionFactory = createInVMFactory();
       ClientSession session = sessionFactory.createSession(false, true, false, false);
       session.createQueue(atestq, atestq, null, true, true);
       session.createQueue(atestq, atestq2, null, true, true);
@@ -288,9 +249,9 @@
       session.close();
       messagingService.stop();
       messagingService = null;
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
+      messagingService = createService(true, configuration);
       messagingService.start();
-      sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      sessionFactory = createInVMFactory();
       session = sessionFactory.createSession(false, true, true, false);
       consumer = session.createConsumer(atestq);
       consumer2 = session.createConsumer(atestq2);
@@ -316,13 +277,8 @@
    public void testMessageDeliveredCorrectly(boolean recover) throws Exception
    {
 
-      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
-      // start the server
-      messagingService.start();
-      // then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+       // then we create a client as normal
+      ClientSessionFactory sessionFactory = createInVMFactory();
       ClientSession session = sessionFactory.createSession(false, true, false, false);
       session.createQueue(atestq, atestq, null, true, true);
       ClientProducer producer = session.createProducer(atestq);
@@ -344,9 +300,9 @@
          session.close();
          messagingService.stop();
          messagingService = null;
-         messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
+         messagingService = createService(true, configuration);
          messagingService.start();
-         sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+         sessionFactory = createInVMFactory();
          session = sessionFactory.createSession(false, true, true, false);
       }
       ClientConsumer consumer = session.createConsumer(atestq);
@@ -370,13 +326,7 @@
    public void testScheduledMessagesDeliveredCorrectly(boolean recover) throws Exception
    {
 
-      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
-      // start the server
-      messagingService.start();
-      // then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSessionFactory sessionFactory = createInVMFactory();
       ClientSession session = sessionFactory.createSession(false, true, false, false);
       session.createQueue(atestq, atestq, null, true, true);
       ClientProducer producer = session.createProducer(atestq);
@@ -403,10 +353,10 @@
          session.close();
          messagingService.stop();
          messagingService = null;
-         messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
+         messagingService = createService(true, configuration);
          messagingService.start();
 
-         sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+         sessionFactory = createInVMFactory();
 
          session = sessionFactory.createSession(false, true, true, false);
       }
@@ -451,13 +401,7 @@
    public void testScheduledMessagesDeliveredCorrectlyDifferentOrder(boolean recover) throws Exception
    {
 
-      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
-      // start the server
-      messagingService.start();
-      // then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSessionFactory sessionFactory = createInVMFactory();
       ClientSession session = sessionFactory.createSession(false, true, false, false);
       session.createQueue(atestq, atestq, null, true, true);
       ClientProducer producer = session.createProducer(atestq);
@@ -485,10 +429,10 @@
          session.close();
          messagingService.stop();
          messagingService = null;
-         messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
+         messagingService = createService(true, configuration);
          messagingService.start();
 
-         sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+         sessionFactory = createInVMFactory();
 
          session = sessionFactory.createSession(false, true, true, false);
 
@@ -533,13 +477,7 @@
    public void testScheduledAndNormalMessagesDeliveredCorrectly(boolean recover) throws Exception
    {
 
-      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
-      // start the server
-      messagingService.start();
-      // then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      ClientSessionFactory sessionFactory = createInVMFactory();
       ClientSession session = sessionFactory.createSession(false, true, false, false);
       session.createQueue(atestq, atestq, null, true, true);
       ClientProducer producer = session.createProducer(atestq);
@@ -565,10 +503,10 @@
          session.close();
          messagingService.stop();
          messagingService = null;
-         messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
+         messagingService = createService(true, configuration);
          messagingService.start();
 
-         sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+         sessionFactory = createInVMFactory();
 
          session = sessionFactory.createSession(false, true, true, false);
       }
@@ -609,13 +547,9 @@
    {
       Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
       Xid xid2 = new XidImpl("xa2".getBytes(), 1, new GUID().toString().getBytes());
-      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
-      // start the server
-      messagingService.start();
-      // then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+      
+      ClientSessionFactory sessionFactory = createInVMFactory();
       ClientSession session = sessionFactory.createSession(true, false, false, false);
       session.createQueue(atestq, atestq, null, true, false);
       session.start(xid, XAResource.TMNOFLAGS);
@@ -639,10 +573,10 @@
          session.close();
          messagingService.stop();
          messagingService = null;
-         messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
+         messagingService = createService(true, configuration);
          messagingService.start();
 
-         sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+         sessionFactory = createInVMFactory();
 
          session = sessionFactory.createSession(true, false, false, false);
       }

Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2008-10-28 23:58:25 UTC (rev 5194)
@@ -138,6 +138,11 @@
       return createService(realFiles, createDefaultConfig(), new HashMap<String, QueueSettings>());
    }
 
+   protected MessagingService createService(final boolean realFiles, final Configuration configuration)
+   {
+      return createService(realFiles, configuration, new HashMap<String, QueueSettings>());
+   }
+
    protected Configuration createDefaultConfig()
    {
       return createDefaultConfig(false);
@@ -164,6 +169,7 @@
       configuration.setJournalMinFiles(2);
       configuration.setJournalFileSize(100 * 1024);
       configuration.setPagingDirectory(pageDir);
+      configuration.setLargeMessagesDirectory(largeMessagesDir);
 
       configuration.getAcceptorConfigurations().clear();
 




More information about the jboss-cvs-commits mailing list