[jboss-cvs] JBoss Messaging SVN: r4930 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Sep 10 22:14:13 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-09-10 22:14:12 -0400 (Wed, 10 Sep 2008)
New Revision: 4930

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
   trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
Log:
JBMESSAGING-1299 - reloading & recovery Prepared Transactions.. Applying fixes on paging as well

Modified: trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -24,9 +24,7 @@
 package org.jboss.messaging.core.journal;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 /**
  * 
@@ -43,7 +41,7 @@
    
    public final List<RecordInfo> records = new ArrayList<RecordInfo>();
    
-   public final Set<RecordInfo> recordsToDelete = new HashSet<RecordInfo>();
+   public final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
 
    public PreparedTransactionInfo(final long id, final byte[] extraData)
    {

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -382,6 +382,11 @@
    
    public long getTransactionID()
    {
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }
+      
       return transactionIDSequence.getAndIncrement();
    }
    

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -23,9 +23,7 @@
 package org.jboss.messaging.core.journal.impl;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import org.jboss.messaging.core.journal.RecordInfo;
 
@@ -49,7 +47,7 @@
    
    public final List<RecordInfo> recordInfos = new ArrayList<RecordInfo>();
    
-   public final Set<RecordInfo> recordsToDelete = new HashSet<RecordInfo>();
+   public final ArrayList<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
    
    public boolean prepared;
    

Modified: trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -35,9 +35,11 @@
 public interface PageTransactionInfo extends EncodingSupport
 {
 
-   void waitCompletion() throws InterruptedException;
+   boolean waitCompletion() throws Exception;
    
    void complete();
+   
+   void forget();
 
    long getRecordID();
 
@@ -50,5 +52,7 @@
    int decrement();
    
    int getNumberOfMessages();
+   
+   void markIncomplete();
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -49,7 +49,7 @@
    // Public --------------------------------------------------------
 
    private final ServerMessage message;
-   private long transactionID;
+   private long transactionID = -1;
    
    public PageMessageImpl(final ServerMessage message, final long transactionID)
    {

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -45,6 +45,7 @@
    private long transactionID;
    private long recordID;
    private CountDownLatch countDownCompleted;
+   private volatile boolean complete;
    
    final AtomicInteger numberOfMessages = new AtomicInteger(0);
    
@@ -108,7 +109,8 @@
    {
       this.transactionID = buffer.getLong();
       this.numberOfMessages.set(buffer.getInt());
-      this.countDownCompleted = null; // if it is being readed, certainly it was committed
+      this.countDownCompleted = null; // if it is being readed, probably it was committed
+      this.complete = true;           // Unless it is a incomplete prepare, which is marked by markIcomplete
    }
    
    public synchronized void encode(final MessagingBuffer buffer)
@@ -124,6 +126,7 @@
 
    public void complete()
    {
+      complete = true;
       /** 
        * this is to avoid a race condition where the transaction still being committed another thread is depaging messages
        */
@@ -133,14 +136,29 @@
    /** 
     * this is to avoid a race condition where the transaction still being committed another thread is depaging messages
     */
-   public void waitCompletion() throws InterruptedException
+   public boolean waitCompletion() throws InterruptedException
    {
       if (countDownCompleted != null)
       {
          countDownCompleted.await();
       }
+      
+      return complete;
    }
    
+   public void forget()
+   {
+      complete = false;
+
+      countDownCompleted.countDown();
+   }
+   
+   public void markIncomplete()
+   {
+      complete = false;
+      countDownCompleted = new CountDownLatch(1);
+   }
+   
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -39,7 +39,6 @@
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -79,7 +78,7 @@
    
    // Static --------------------------------------------------------------------------------------------------------------------------
 
-   private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
+   private static final Logger log = Logger.getLogger(PagingManagerImpl.class);
    
    //private static final boolean isTrace = log.isTraceEnabled();
    private static final boolean isTrace = true;
@@ -153,7 +152,7 @@
     */
    public boolean onDepage(int pageId, final SimpleString destination, PagingStore pagingStore, final PageMessage[] data) throws Exception
    {
-      log.info("Depaging....");
+      trace("Depaging....");
       
       /// Depage has to be done atomically, in case of failure it should be back to where it was
       final long depageTransactionID = storageManager.generateTransactionID();
@@ -184,7 +183,7 @@
       for (PageMessage msg: data)
       {
          final long transactionIdDuringPaging = msg.getTransactionID();
-         if (transactionIdDuringPaging > 0)
+         if (transactionIdDuringPaging >= 0)
          {
             final PageTransactionInfo pageTransactionInfo = transactions.get(transactionIdDuringPaging);
             
@@ -194,13 +193,17 @@
             {
                if (isTrace)
                {
-                  trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID());
+                  trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage());
                }
                continue;
             }
             
             // This is to avoid a race condition where messages are depaged before the commit arrived
-            pageTransactionInfo.waitCompletion();
+            if (!pageTransactionInfo.waitCompletion())
+            {
+               trace("Rollback was called after prepare, ignoring message " + msg.getMessage());
+               continue;
+            }
 
             /// Update information about transactions
             if (msg.getMessage().isDurable())
@@ -248,7 +251,7 @@
    
    public void setLastPage(LastPageRecord lastPage) throws Exception
    {
-      System.out.println("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
+      trace("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
       this.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -62,7 +62,7 @@
    
    void storeAcknowledgeTransactional(long txID, long queueID, long messageiD) throws Exception;
    
-   void storeDeleteMessageTransactional(long txID, long messageID, long queueID) throws Exception;
+   void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
    
    /** Used to delete non-messaging data (such as PageTransaction and LasPage) */
    void storeDeleteTransactional(long txID, long recordID) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -250,7 +250,7 @@
    	messageJournal.appendDeleteRecordTransactional(txID, recordID, null);	
    }
    
-   public void storeDeleteMessageTransactional(long txID, long messageID, long queueID) throws Exception
+   public void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception
    {
       messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
    }
@@ -289,63 +289,6 @@
 	
 		idSequence.set(maxID + 1);
 
-		//recover prepared transactions
-      for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
-      {
-         log.trace(preparedTransaction);
-         EncodingXid encodingXid = new EncodingXid(preparedTransaction.extraData);
-         Xid xid = encodingXid.xid;
-
-         Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this, postOffice);
-         List<ServerMessage> messages = new ArrayList<ServerMessage>();
-         List<ServerMessage> messagesToDelete = new ArrayList<ServerMessage>();
-         //first get any sent messages for this tx and recreate
-         for (RecordInfo record : preparedTransaction.records)
-         {
-            byte[] data = record.data;
-
-			   ByteBuffer bb = ByteBuffer.wrap(data);
-
-            MessagingBuffer buff = new ByteBufferWrapper(bb);
-
-				ServerMessage message = new ServerMessageImpl(record.id);
-
-				message.decode(buff);
-
-            messages.add(message);
-         }
-         //ok now find if any records to be deleted which aren't necessarily with this tx
-         List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
-         for (RecordInfo record : records)
-         {
-            if(preparedTransaction.recordsToDelete.contains(record.id))
-            {
-               byte[] data = record.data;
-
-			      ByteBuffer bb = ByteBuffer.wrap(data);
-
-			      byte recordType = record.getUserRecordType();
-
-               MessagingBuffer buff = new ByteBufferWrapper(bb);
-
-					ServerMessage message = new ServerMessageImpl(record.id);
-
-					message.decode(buff);
-
-               messagesToDelete.add(message);
-
-               recordsToDelete.add(record);
-            }
-         }
-         //now we recreate the state of the tx and add to th erresource manager
-         tx.replay(messages, messagesToDelete, Transaction.State.PREPARED);
-         resourceManager.putTransaction(xid, tx);
-         //and finally since we've dealt with the records we don't need to process them.
-         for (RecordInfo recordInfo : recordsToDelete)
-         {
-            records.remove(recordInfo);
-         }
-      }
 		for (RecordInfo record: records)
 		{
 			byte[] data = record.data;
@@ -463,8 +406,11 @@
 				}				
 			}
 		}
-	}
-	
+		
+		loadPreparedTransactions(postOffice, queues, resourceManager,preparedTransactions);
+		
+   }
+
 	//Bindings operations
 	
 	public void addBinding(Binding binding) throws Exception
@@ -646,6 +592,118 @@
 	// Private ----------------------------------------------------------------------------------
 	
 	
+   private void loadPreparedTransactions(final PostOffice postOffice,
+         final Map<Long, Queue> queues, ResourceManager resourceManager,
+         List<PreparedTransactionInfo> preparedTransactions) throws Exception
+   {
+      //recover prepared transactions
+      for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
+      {
+         log.trace(preparedTransaction);
+         EncodingXid encodingXid = new EncodingXid(preparedTransaction.extraData);
+         Xid xid = encodingXid.xid;
+
+         Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this, postOffice);
+         List<MessageReference> messages = new ArrayList<MessageReference>();
+         List<MessageReference> messagesToAck = new ArrayList<MessageReference>();
+         
+         PageTransactionInfoImpl pageTransactionInfo = null;
+         
+         //first get any sent messages for this tx and recreate
+         for (RecordInfo record : preparedTransaction.records)
+         {
+            byte[] data = record.data;
+
+            ByteBuffer bb = ByteBuffer.wrap(data);
+
+            MessagingBuffer buff = new ByteBufferWrapper(bb);
+
+            byte recordType = record.getUserRecordType();
+
+            switch(recordType)
+            {
+               case ADD_MESSAGE:
+               {
+                  ServerMessage message = new ServerMessageImpl(record.id);
+
+                  message.decode(buff);
+
+                  List<MessageReference> refs = postOffice.route(message);
+                  messages.addAll(refs);
+                  break;
+               }
+               case ACKNOWLEDGE_REF:
+               {
+                  long messageID = record.id;
+
+                  ACKEncoding encoding = new ACKEncoding();
+                  encoding.decode(buff);
+
+
+                  Queue queue = queues.get(encoding.queueID);
+
+                  if (queue == null)
+                  {
+                     throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+                  }
+
+                  MessageReference removed = queue.removeReferenceWithID(messageID);
+
+                  messagesToAck.add(removed);
+                  if (removed == null)
+                  {
+                     throw new IllegalStateException("Failed to remove reference for " + messageID);
+                  }
+                  break;
+               }
+               case PAGE_TRANSACTION:
+               {
+                  pageTransactionInfo = new PageTransactionInfoImpl();
+                  pageTransactionInfo.decode(buff);
+                  pageTransactionInfo.markIncomplete();
+                  break;
+               }
+               default:
+                  log.warn("InternalError: Record type " + recordType + " not recognized. Maybe you're using journal files created on a different version" );
+            }
+         }
+         
+         for (RecordInfo record : preparedTransaction.recordsToDelete)
+         {
+            byte[] data = record.data;
+
+            ByteBuffer bb = ByteBuffer.wrap(data);
+
+            MessagingBuffer buff = new ByteBufferWrapper(bb);
+
+            long messageID = record.id;
+
+            DeleteEncoding encoding = new DeleteEncoding();
+            encoding.decode(buff);
+
+
+            Queue queue = queues.get(encoding.queueID);
+
+            if (queue == null)
+            {
+               throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+            }
+
+            MessageReference removed = queue.removeReferenceWithID(messageID);
+
+            messagesToAck.add(removed);
+            if (removed == null)
+            {
+               throw new IllegalStateException("Failed to remove reference for " + messageID);
+            }
+         }
+         
+         //now we recreate the state of the tx and add to the resource manager
+         tx.replay(messages, messagesToAck, pageTransactionInfo, Transaction.State.PREPARED);
+         resourceManager.putTransaction(xid, tx);
+      }
+   }
+   
 	private void checkAndCreateDir(String dir, boolean create)
 	{
 		File f = new File(dir);

Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.core.transaction;
 
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -66,7 +67,7 @@
 
    void markAsRollbackOnly(MessagingException messagingException);
 
-   void replay(List<ServerMessage> messages, List<ServerMessage> acknowledgements, State prepared) throws Exception;
+   void replay(List<MessageReference> messages, List<MessageReference> acknowledgements, PageTransactionInfo pageTransaction, State prepared) throws Exception;
 
    static enum State
    {

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -24,6 +24,7 @@
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -60,7 +61,7 @@
 
    private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
 
-   private PageTransactionInfoImpl pageTransaction;
+   private PageTransactionInfo pageTransaction;
 
    private final Xid xid;
 
@@ -200,13 +201,11 @@
 
                if (count == 0)
                {
-                  storageManager.storeDeleteTransactional(id, message
-                          .getMessageID());
+                  storageManager.storeDeleteMessageTransactional(id, queue.getPersistenceID(), message.getMessageID());
                }
                else
                {
-                  storageManager.storeAcknowledgeTransactional(id, queue
-                          .getPersistenceID(), message.getMessageID());
+                  storageManager.storeAcknowledgeTransactional(id, queue.getPersistenceID(), message.getMessageID());
                }
 
                containsPersistent = true;
@@ -317,6 +316,11 @@
       {
          storageManager.rollback(id);
       }
+      
+      if (state == State.PREPARED && pageTransaction != null)
+      {
+         pageTransaction.forget();
+      }
 
       Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
 
@@ -415,22 +419,19 @@
       this.messagingException = messagingException;
    }
 
-   public void replay(List<ServerMessage> messages, List<ServerMessage> acknowledgements, State prepared) throws Exception
+   public void replay(List<MessageReference> messages, List<MessageReference> acknowledgements, PageTransactionInfo pageTransaction, State prepared) throws Exception
    {
       containsPersistent = true;
-      //acknowledgements.add
-      for (ServerMessage message : messages)
+      refsToAdd.addAll(messages);
+      this.acknowledgements.addAll(acknowledgements);
+      this.pageTransaction = pageTransaction;
+      
+      if (this.pageTransaction != null)
       {
-         List<MessageReference> refs = postOffice.route(message);
-         refsToAdd.addAll(refs);
+         pagingManager.addTransaction(this.pageTransaction);
       }
-      for (ServerMessage message : acknowledgements)
-      {
-          List<MessageReference> refs = postOffice.route(message);
-         this.acknowledgements.addAll(refs);
-      }
+
       state = prepared;
-
    }
 
    public void setContainsPersistent(final boolean containsPersistent)

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -28,7 +28,9 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
 import org.jboss.messaging.jms.client.JBossTextMessage;
 import org.jboss.messaging.tests.util.UnitTestCase;
 import org.jboss.messaging.util.SimpleString;
@@ -38,6 +40,8 @@
 import javax.transaction.xa.Xid;
 import java.io.File;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -46,9 +50,12 @@
 {
    private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
    private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+   
+   private Map<String, QueueSettings> queueSettings = new HashMap<String, QueueSettings>();
 
    private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/journal";
    private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/bindings";
+   private String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/page";
    private MessagingService messagingService;
    private ClientSession clientSession;
    private ClientProducer clientProducer;
@@ -59,15 +66,20 @@
 
    protected void setUp() throws Exception
    {
+      queueSettings.clear();
       File file = new File(journalDir);
       File file2 = new File(bindingsDir);
+      File file3 = new File(pageDir);
       deleteDirectory(file);
       file.mkdirs();
       deleteDirectory(file2);
       file2.mkdirs();
+      deleteDirectory(file3);
+      file3.mkdirs();
       configuration = new ConfigurationImpl();
       configuration.setSecurityEnabled(false);
       configuration.setJournalMinFiles(2);
+      configuration.setPagingDirectory(pageDir);
 
       TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
       configuration.getAcceptorConfigurations().add(transportConfig);
@@ -220,6 +232,142 @@
       testMultipleTxReceiveWithRollback(true);  
    }
    
+   
+   public void testPagingServerRestarted() throws Exception
+   {
+      testPaging(true);
+   }
+   
+   public void testPaging() throws Exception
+   {
+      testPaging(false);
+   }
+   
+   public void testPaging(boolean restartServer) throws Exception
+   {
+      Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+      
+      SimpleString pageQueue = new SimpleString("pagequeue");
+      
+      QueueSettings pageQueueSettings = new QueueSettings();
+      pageQueueSettings.setMaxSizeBytes(100*1024);
+      pageQueueSettings.setPageSizeBytes(10*1024);
+      
+      queueSettings.put(pageQueue.toString(), pageQueueSettings);
+
+      addSettings();
+      
+      clientSession.createQueue(pageQueue, pageQueue, null, true, true);
+      
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      
+      ClientProducer pageProducer = clientSession.createProducer(pageQueue);
+      
+      for (int i = 0; i < 1000; i++)
+      {
+         ClientMessage m = createBytesMessage(new byte[512], true);
+         pageProducer.send(m);
+      }
+      
+      pageProducer.close();
+
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+      
+      if (restartServer)
+      {
+         stopAndRestartServer();
+      }
+      else
+      {
+         recreateClients();
+      }
+      
+      Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
+      assertEquals(xids.length, 1);
+      assertEquals(xids[0].getFormatId(), xid.getFormatId());
+      assertEqualsByteArrays(xids[0].getBranchQualifier(), xid.getBranchQualifier());
+      assertEqualsByteArrays(xids[0].getGlobalTransactionId(), xid.getGlobalTransactionId());
+
+      clientSession.commit(xid, true);
+
+      clientSession.start();
+
+      ClientConsumer pageConsumer = clientSession.createConsumer(pageQueue);
+
+      for (int i = 0; i < 1000; i++)
+      {
+         ClientMessage m = pageConsumer.receive(10000);
+          assertNotNull(m);
+         clientSession.acknowledge();
+      }  
+      
+   }
+   
+   public void testRollbackPaging() throws Exception
+   {
+      testRollbackPaging(false);
+   }
+   
+   public void testRollbackPagingServerRestarted() throws Exception
+   {
+      testRollbackPaging(true);
+   }
+   
+   public void testRollbackPaging(boolean restartServer) throws Exception
+   {
+     Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+      
+      SimpleString pageQueue = new SimpleString("pagequeue");
+      
+      QueueSettings pageQueueSettings = new QueueSettings();
+      pageQueueSettings.setMaxSizeBytes(100*1024);
+      pageQueueSettings.setPageSizeBytes(10*1024);
+      
+      queueSettings.put(pageQueue.toString(), pageQueueSettings);
+
+      addSettings();
+      
+      clientSession.createQueue(pageQueue, pageQueue, null, true, true);
+      
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      
+      ClientProducer pageProducer = clientSession.createProducer(pageQueue);
+      
+      for (int i = 0; i < 1000; i++)
+      {
+         ClientMessage m = createBytesMessage(new byte[512], true);
+         pageProducer.send(m);
+      }
+
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+      
+      if (restartServer)
+      {
+         stopAndRestartServer();
+      }
+      else
+      {
+         recreateClients();
+      }
+      
+      Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
+      assertEquals(1, xids.length);
+      assertEquals(xids[0].getFormatId(), xid.getFormatId());
+      assertEqualsByteArrays(xids[0].getBranchQualifier(), xid.getBranchQualifier());
+      assertEqualsByteArrays(xids[0].getGlobalTransactionId(), xid.getGlobalTransactionId());
+
+      clientSession.rollback(xid);
+
+      clientSession.start();
+
+      ClientConsumer pageConsumer = clientSession.createConsumer(pageQueue);
+
+      assertNull(pageConsumer.receive(100));
+      
+   }
+   
    public void testNonPersistent() throws Exception
    {
       testNonPersistent(true);
@@ -227,7 +375,7 @@
    }
 
 
-   public void testNonPersistent(boolean commit) throws Exception
+   public void testNonPersistent(final boolean commit) throws Exception
    {
       Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
 
@@ -254,9 +402,52 @@
       assertEqualsByteArrays(xids[0].getGlobalTransactionId(), xid.getGlobalTransactionId());
       xids = clientSession.recover(XAResource.TMENDRSCAN);
       assertEquals(xids.length, 0);
-      clientSession.commit(xid, true);
+      if (commit)
+      {
+         clientSession.commit(xid, true);
+      }
+      else
+      {
+         clientSession.rollback(xid);
+      }
    }
    
+   public void testNonPersistentMultipleIDs() throws Exception
+   {
+      for (int i = 0; i < 10; i++)
+      {
+         Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+         ClientMessage m1 = createTextMessage("m1", false);
+         ClientMessage m2 = createTextMessage("m2", false);
+         ClientMessage m3 = createTextMessage("m3", false);
+         ClientMessage m4 = createTextMessage("m4", false);
+   
+         clientSession.start(xid, XAResource.TMNOFLAGS);
+         clientProducer.send(m1);
+         clientProducer.send(m2);
+         clientProducer.send(m3);
+         clientProducer.send(m4);
+         clientSession.end(xid, XAResource.TMSUCCESS);
+         clientSession.prepare(xid);
+         
+         if (i == 2)
+         {
+            clientSession.commit(xid, true);
+         }
+         
+         recreateClients();
+         
+         
+      }
+
+      stopAndRestartServer();
+
+      Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
+
+      assertEquals(9, xids.length);
+   }
+   
    public void testBasicSendWithCommit(boolean stopServer) throws Exception
    {
       Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
@@ -284,13 +475,14 @@
       }
 
       Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
-
       assertEquals(xids.length, 1);
       assertEquals(xids[0].getFormatId(), xid.getFormatId());
       assertEqualsByteArrays(xids[0].getBranchQualifier(), xid.getBranchQualifier());
       assertEqualsByteArrays(xids[0].getGlobalTransactionId(), xid.getGlobalTransactionId());
+      
       xids = clientSession.recover(XAResource.TMENDRSCAN);
       assertEquals(xids.length, 0);
+      
       clientSession.commit(xid, true);
       clientSession.start();
       ClientMessage m = clientConsumer.receive(1000);
@@ -997,10 +1189,21 @@
       messagingService.stop();
       messagingService = null;
       messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+      
+      addSettings();
+      
       messagingService.start();
       createClients();
    }
 
+   private void addSettings()
+   {
+      for (Map.Entry<String, QueueSettings> setting: this.queueSettings.entrySet())
+      {
+         messagingService.getServer().getQueueSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+      }
+   }
+
    protected void recreateClients() throws Exception
    {
       clientSession.close();
@@ -1021,6 +1224,14 @@
       return message;
    }
 
+   private ClientMessage createBytesMessage(byte[] b, boolean durable)
+   {
+      ClientMessage message = clientSession.createClientMessage(JBossBytesMessage.TYPE, durable, 0, System.currentTimeMillis(), (byte) 1);
+      message.getBody().putBytes(b);
+      message.getBody().flip();
+      return message;
+   }
+
    private void createClients()
          throws MessagingException
    {

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -142,6 +142,8 @@
       
       Queue queue = EasyMock.createStrictMock(Queue.class);
       
+      EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
+      
       MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
       messageReference.setDeliveryCount(1);
       
@@ -219,6 +221,8 @@
       
       Queue queue = EasyMock.createStrictMock(Queue.class);
       
+      EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
+
       MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
       messageReference.setDeliveryCount(1);
       
@@ -284,6 +288,9 @@
       HierarchicalRepository<QueueSettings> repos = EasyMock.createStrictMock(HierarchicalRepository.class);
       ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
       Queue queue = EasyMock.createStrictMock(Queue.class);
+      
+      EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
+
       MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
       messageReference.setDeliveryCount(1);
       SimpleString queueName = new SimpleString("queueName");
@@ -323,6 +330,9 @@
       
       ServerMessage serverMessage = EasyMock.createNiceMock(ServerMessage.class);
       Queue queue = EasyMock.createStrictMock(Queue.class);
+      
+      EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
+
       MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
       messageReference.setDeliveryCount(1);
       SimpleString queueName = new SimpleString("queueName");
@@ -377,6 +387,9 @@
       ServerMessage serverMessage = EasyMock.createNiceMock(ServerMessage.class);
       
       Queue queue = EasyMock.createStrictMock(Queue.class);
+      
+      EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
+
       MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
       messageReference.setDeliveryCount(1);
       SimpleString queueName = new SimpleString("queueName");

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -770,7 +770,7 @@
       {
          if (i % 2 == 0)
          {
-            storageManager.storeDeleteTransactional(1, i);
+            storageManager.storeDeleteMessageTransactional(1, queue.getPersistenceID(), i);
          }
       }
 
@@ -778,7 +778,7 @@
       {
          if (i % 2 == 0)
          {
-            storageManager.storeDeleteTransactional(1, i);
+            storageManager.storeDeleteMessageTransactional(1, queue.getPersistenceID(), i);
          }
       }
 
@@ -1341,7 +1341,7 @@
       StorageManager storageManager = EasyMock.createMock(StorageManager.class);
       EasyMock.expect(storageManager.generateTransactionID()).andReturn(randomLong());
       EasyMock.expect(storageManager.generateID()).andReturn(randomLong());
-      storageManager.storeDeleteTransactional(EasyMock.anyLong(), EasyMock.eq(messageID));
+      storageManager.storeDeleteMessageTransactional(EasyMock.anyLong(), EasyMock.eq(queue.getPersistenceID()), EasyMock.eq(messageID));
       storageManager.commit(EasyMock.anyLong());
 
       PostOffice postOffice = createMock(PostOffice.class);
@@ -1399,7 +1399,7 @@
       StorageManager storageManager = createMock(StorageManager.class);
       expect(storageManager.generateTransactionID()).andReturn(randomLong());
       expect(storageManager.generateID()).andReturn(randomLong());
-      storageManager.storeDeleteTransactional(anyLong(), eq(messageID));
+      storageManager.storeDeleteMessageTransactional(anyLong(), eq(queue.getPersistenceID()), eq(messageID));
       storageManager.commit(anyLong());
       
       PostOffice postOffice = createMock(PostOffice.class);
@@ -1460,7 +1460,7 @@
       StorageManager storageManager = EasyMock.createMock(StorageManager.class);
       EasyMock.expect(storageManager.generateID()).andReturn(newMessageID);
       EasyMock.expect(storageManager.generateTransactionID()).andReturn(tid);
-      storageManager.storeDeleteTransactional(EasyMock.anyLong(), EasyMock.eq(messageID));
+      storageManager.storeDeleteMessageTransactional(EasyMock.anyLong(), EasyMock.eq(queue.getPersistenceID()), EasyMock.eq(messageID));
       storageManager.commit(EasyMock.anyLong());
       
       PostOffice postOffice = EasyMock.createMock(PostOffice.class);      

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java	2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java	2008-09-11 02:14:12 UTC (rev 4930)
@@ -585,7 +585,7 @@
       //Expect:
       
       sm.storeAcknowledgeTransactional(txID, queue1.getPersistenceID(), message1.getMessageID());
-      sm.storeDeleteTransactional(txID, message1.getMessageID());
+      sm.storeDeleteMessageTransactional(txID, queue2.getPersistenceID(), message1.getMessageID());
       
       EasyMock.replay(sm);
       




More information about the jboss-cvs-commits mailing list