[jboss-cvs] JBoss Messaging SVN: r3456 - in branches/Branch_New_Persistence: src/main/org/jboss/messaging/core/impl/postoffice and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Dec 10 07:53:34 EST 2007


Author: timfox
Date: 2007-12-10 07:53:34 -0500 (Mon, 10 Dec 2007)
New Revision: 3456

Added:
   branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/TransactionTest.java
   branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/bdbje/BDBJEPersistenceManagerTest.java
Removed:
   branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/BDBJEPersistenceManagerTest.java
Modified:
   branches/Branch_New_Persistence/.classpath
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java
   branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/intf/PersistenceManager.java
   branches/Branch_New_Persistence/src/main/org/jboss/test/concurrent/messaging/newcore/impl/QueueTest.java
   branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/UnitTestCase.java
   branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/MessageTest.java
Log:
More changes


Modified: branches/Branch_New_Persistence/.classpath
===================================================================
--- branches/Branch_New_Persistence/.classpath	2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/.classpath	2007-12-10 12:53:34 UTC (rev 3456)
@@ -61,5 +61,6 @@
 	<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-xml-binding.jar"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
 	<classpathentry kind="lib" path="/home/tim/work/je-3.2.44/lib/je-3.2.44.jar"/>
+	<classpathentry kind="lib" path="/home/tim/work/easymock2.3/easymock.jar"/>
 	<classpathentry kind="output" path="bin"/>
 </classpath>

Modified: branches/Branch_New_Persistence/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-12-10 12:53:34 UTC (rev 3456)
@@ -38,6 +38,7 @@
 import org.jgroups.Address;
 import org.jgroups.Channel;
 import org.jgroups.MembershipListener;
+import org.jgroups.MergeView;
 import org.jgroups.Message;
 import org.jgroups.MessageListener;
 import org.jgroups.Receiver;
@@ -455,6 +456,11 @@
       public void viewAccepted(final View newView)
       {     	
       	log.debug(this  + " got new view " + newView + ", old view is " + currentView);
+      	
+      	if (newView instanceof MergeView)
+      	{
+      	   //TODO handle merging after split-brain
+      	}
 		      	
          // JGroups will make sure this method is never called by more than one thread concurrently
 

Modified: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java	2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java	2007-12-10 12:53:34 UTC (rev 3456)
@@ -100,6 +100,7 @@
                       long timestamp, byte priority)
    {
       this.messageID = messageID;
+      this.type = type;
       this.reliable = reliable;
       this.expiration = expiration;
       this.timestamp = timestamp;

Modified: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java	2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java	2007-12-10 12:53:34 UTC (rev 3456)
@@ -26,7 +26,7 @@
 
 import javax.transaction.xa.Xid;
 
-import org.jboss.logging.Logger;
+import org.jboss.messaging.newcore.intf.Message;
 import org.jboss.messaging.newcore.intf.MessageReference;
 import org.jboss.messaging.newcore.intf.PersistenceManager;
 import org.jboss.messaging.newcore.intf.Transaction;
@@ -41,12 +41,8 @@
  */
 public class TransactionImpl implements Transaction
 {
-   private static final Logger log = Logger.getLogger(TransactionImpl.class);
+   private List<Message> messagesToAdd;
    
-   private static final boolean trace = log.isTraceEnabled();
-      
-   private List<MessageReference> refsToAdd;
-   
    private List<MessageReference> refsToRemove;
    
    private List<TransactionSynchronization> synchronizations = new ArrayList<TransactionSynchronization>();
@@ -55,16 +51,26 @@
    
    private boolean containsPersistent;
    
-   public TransactionImpl(List<MessageReference> refsToAdd, List<MessageReference> refsToRemove,
+   private boolean prepared;
+   
+   public TransactionImpl(List<Message> messagesToAdd, List<MessageReference> refsToRemove,
                           boolean containsPersistent)
    {
-      this.refsToAdd = refsToAdd;
+      this.messagesToAdd = messagesToAdd;
       
       this.refsToRemove = refsToRemove;
       
       this.containsPersistent = containsPersistent;
    }
    
+   public TransactionImpl(Xid xid, List<Message> messagesToAdd, List<MessageReference> refsToRemove,
+                          boolean containsPersistent)
+   {
+      this(messagesToAdd, refsToRemove, containsPersistent);
+      
+      this.xid = xid;
+   }
+   
    // Transaction implementation -----------------------------------------------------------
    
    public void addSynchronization(TransactionSynchronization sync)
@@ -72,7 +78,7 @@
       synchronizations.add(sync);
    }
    
-   public void prepare(PersistenceManager persistenceManager)
+   public void prepare(PersistenceManager persistenceManager) throws Exception
    {
       if (xid == null)
       {
@@ -80,7 +86,9 @@
       }
       else
       {
+         persistenceManager.prepareTransaction(xid, messagesToAdd, refsToRemove);
          
+         prepared = true;
       }
    }
    
@@ -93,52 +101,30 @@
          if (xid == null)
          {
             //1PC commit
-            PersistenceTransaction tx = null;
             
-            try
-            {         
-               tx = persistenceManager.createTransaction(false);
-               
-               playOperations(tx, persistenceManager);
-               
-               tx.commit();
-                                                                          
-            }
-            catch (Exception e)
-            {
-               try
-               {
-                  tx.rollback();
-               }
-               catch (Throwable t)
-               {
-                  if (trace) { log.trace("Failed to rollback", t); }
-               }
-               throw e;
-            }
+            persistenceManager.commitTransaction(messagesToAdd, refsToRemove);
          }
          else
          {
             //2PC commit
             
-            PersistenceTransaction tx = null;
+            if (!prepared)
+            {
+               throw new IllegalStateException("Transaction is not prepared");
+            }
             
-            tx = persistenceManager.getTransaction(xid);
-            
-            playOperations(tx, persistenceManager);
-            
-            tx.commit();
-            
-            
+            persistenceManager.commitPreparedTransaction(xid);                        
          } 
       }
-      
-      
+            
       //Now add to queue(s)
       
-      for (MessageReference reference: refsToAdd)
+      for (Message msg: messagesToAdd)
       {
-         reference.getQueue().addLast(reference);
+         for (MessageReference ref: msg.getReferences())
+         {
+            ref.getQueue().addLast(ref);
+         }
       }
       
       callSynchronizations(SyncType.AFTER_COMMIT);
@@ -154,33 +140,14 @@
       }
       else
       {
-         
+         persistenceManager.unprepareTransaction(xid, messagesToAdd, refsToRemove);
       }
-      callSynchronizations(SyncType.AFTER_ROLLBACK);
       
+      callSynchronizations(SyncType.AFTER_ROLLBACK);      
    }
    
    // Private -------------------------------------------------------------------
    
-   private void playOperations(PersistenceTransaction tx, PersistenceManager persistenceManager) throws Exception
-   {
-      for (MessageReference reference: refsToAdd)
-      {
-         if (reference.getMessage().isReliable())
-         {
-            persistenceManager.addReference(tx, reference.getQueue(), reference);
-         }
-      }
-      
-      for (MessageReference reference: refsToRemove)
-      {
-         if (reference.getMessage().isReliable())
-         {
-            persistenceManager.removeReference(tx, reference.getQueue(), reference);
-         }
-      }
-   }
-   
    private void callSynchronizations(SyncType type) throws Exception
    {
       for (TransactionSynchronization sync: synchronizations)
@@ -197,7 +164,7 @@
          {
             sync.beforeRollback();
          }
-         else if (type == SyncType.AFTER_ROLLBACK);
+         else if (type == SyncType.AFTER_ROLLBACK)
          {
             sync.afterRollback();
          }            

Modified: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java	2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java	2007-12-10 12:53:34 UTC (rev 3456)
@@ -35,8 +35,6 @@
 import org.jboss.messaging.newcore.intf.Queue;
 import org.jboss.messaging.util.Pair;
 
-import com.sleepycat.je.DatabaseEntry;
-
 /**
  * 
  * A PersistenceManager implementation that stores messages using Berkeley DB Java Edition.
@@ -143,12 +141,14 @@
       
       environment.stop();
       
+      recovery = false;
+      
       started = false;
    }
    
    // PersistenceManager implementation ----------------------------------------------------------
 
-   public void commitMessage(Message message) throws Exception
+   public void addMessage(Message message) throws Exception
    {
       BDBJETransaction tx = null;
       
@@ -175,19 +175,18 @@
          }
       }            
    }
-   
-   public void commitMessages(List<Message> messages)
+      
+   public void commitTransaction(List<Message> messagesToAdd,
+                                 List<MessageReference> referencesToRemove) throws Exception
    {
+     
       BDBJETransaction tx = null;
       
       try
       {      
          tx = environment.createTransaction();  
          
-         for (Message message: messages)
-         {
-            internalCommitMessage(tx, message);
-         }
+         playTx(tx, messagesToAdd, referencesToRemove);
          
          tx.commit();
       }
@@ -206,38 +205,46 @@
          }
       }      
    }
-   
-   public void deleteReferences(List<MessageReference> references)
-   {
-      BDBJETransaction tx = null;
-      
+
+   public void prepareTransaction(Xid xid, List<Message> messagesToAdd,
+                                  List<MessageReference> referencesToRemove) throws Exception
+   { 
+      environment.startWork(xid);
+
       try
-      {      
-         tx = environment.createTransaction();
-         
-         for (MessageReference ref: references)
-         {
-            internalDeleteReference(tx, ref);
-         }
-         
-         tx.commit();
+      {         
+         playTx(null, messagesToAdd, referencesToRemove);
       }
       catch (Exception e)
       {
          try
          {
-            if (tx != null)
-            {
-               tx.rollback();
-            }
+            environment.endWork(xid, true);
          }
          catch (Throwable ignore)
          {
-            if (trace) { log.trace("Failed to rollback", ignore); }
+            if (trace) { log.trace("Failed to end", ignore); }
          }
+
+         throw e;
       }
+
+      environment.endWork(xid, false);
+
+      environment.prepare(xid);
    }
 
+   public void commitPreparedTransaction(Xid xid) throws Exception
+   {
+      environment.commit(xid);
+   }
+
+   public void unprepareTransaction(Xid xid, List<Message> messagesToAdd,
+                                    List<MessageReference> referencesToRemove) throws Exception
+   { 
+      environment.rollback(xid);      
+   }
+   
    public void deleteReference(MessageReference reference)
    {
       BDBJETransaction tx = null;
@@ -268,6 +275,11 @@
       
    public List<Xid> getInDoubtXids() throws Exception
    {
+      if (!recovery)
+      {
+         throw new IllegalStateException("Must be in recovery mode to call getInDoubtXids()");
+      }
+      
       return environment.getInDoubtXids();
    }
 
@@ -283,128 +295,107 @@
    
    public void loadQueues(Map<Long, Queue> queues) throws Exception
    {
-      BDBJECursor cursorMessage = this.messageDB.cursor();
+      BDBJECursor cursorMessage = null;
       
-      BDBJECursor cursorRef = this.refDB.cursor();
+      BDBJECursor cursorRef = null;
       
-      Pair<Long, byte[]> messagePair;
-      
-      Pair<Long, byte[]> refPair;
-      
-      while ((messagePair = cursorMessage.getNext()) != null)
+      try
       {
-         refPair = cursorRef.getNext();
+         cursorMessage = messageDB.cursor();
          
-         if (refPair == null)
-         {
-            throw new IllegalStateException("Message and ref data out of sync");
-         }
-                 
-         long id = messagePair.a;
+         cursorRef = refDB.cursor();
          
-         byte[] bytes = messagePair.b;
-               
-         ByteBuffer buffer = ByteBuffer.wrap(bytes);
+         Pair<Long, byte[]> messagePair;
          
-         int type = buffer.getInt();
+         Pair<Long, byte[]> refPair;
          
-         long expiration = buffer.getLong();
-         
-         long timestamp = buffer.getLong();
-         
-         byte priority = buffer.get();
-         
-         int headerSize = buffer.getInt();
-         
-         //TODO we can optimise this to prevent a copy - let the message use a window on the byte[]
-         
-         byte[] headers = new byte[headerSize];
-         
-         buffer.get(headers);
-         
-         int payloadSize = buffer.getInt();
-         
-         byte[] payload = new byte[payloadSize];
-         
-         buffer.get(payload);
-         
-         Message message = new MessageImpl(id, type, true, expiration, timestamp, priority,
-                                           headers, payload);
-         
-         //Now the ref data
-         
-         byte[] refBytes = refPair.b;
-         
-         buffer = ByteBuffer.wrap(refBytes);
-         
-         while (buffer.hasRemaining())
+         while ((messagePair = cursorMessage.getNext()) != null)
          {
-            long queueID = buffer.getLong();
+            refPair = cursorRef.getNext();
             
-            int deliveryCount = buffer.getInt();
+            if (refPair == null)
+            {
+               throw new IllegalStateException("Message and ref data out of sync");
+            }
+                    
+            long id = messagePair.a;
             
-            long scheduledDeliveryTime = buffer.getLong();
+            byte[] bytes = messagePair.b;
+                  
+            ByteBuffer buffer = ByteBuffer.wrap(bytes);
             
-            Queue queue = queues.get(queueID);
+            int type = buffer.getInt();
             
-            if (queue == null)
+            long expiration = buffer.getLong();
+            
+            long timestamp = buffer.getLong();
+            
+            byte priority = buffer.get();
+            
+            int headerSize = buffer.getInt();
+            
+            //TODO we can optimise this to prevent a copy - let the message use a window on the byte[]
+            
+            byte[] headers = new byte[headerSize];
+            
+            buffer.get(headers);
+            
+            int payloadSize = buffer.getInt();
+            
+            byte[] payload = new byte[payloadSize];
+            
+            buffer.get(payload);
+            
+            Message message = new MessageImpl(id, type, true, expiration, timestamp, priority,
+                                              headers, payload);
+            
+            //Now the ref data
+            
+            byte[] refBytes = refPair.b;
+            
+            buffer = ByteBuffer.wrap(refBytes);
+            
+            while (buffer.hasRemaining())
             {
-               //Ok - queue is not deployed
-            }
-            else
-            {
-               MessageReference reference = message.createReference(queue);
+               long queueID = buffer.getLong();
                
-               reference.setDeliveryCount(deliveryCount);
+               int deliveryCount = buffer.getInt();
                
-               reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+               long scheduledDeliveryTime = buffer.getLong();
                
-               queue.addLast(reference);
-            }    
-         }         
-      }     
-   }
-
-   public void prepareMessages(Xid xid, List<Message> messages) throws Exception
-   {      
-      environment.startWork(xid);
-      
-      try
-      {         
-         for (Message message: messages)
-         {
-            internalCommitMessage(null, message);
+               Queue queue = queues.get(queueID);
+               
+               if (queue == null)
+               {
+                  //Ok - queue is not deployed
+               }
+               else
+               {
+                  MessageReference reference = message.createReference(queue);
+                  
+                  reference.setDeliveryCount(deliveryCount);
+                  
+                  reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+                  
+                  queue.addLast(reference);
+               }    
+            }  
          }
       }
-      catch (Exception e)
+      finally
       {
-         try
+         if (cursorMessage != null)
          {
-            environment.endWork(xid, true);
+            cursorMessage.close();
          }
-         catch (Throwable ignore)
+         
+         if (cursorRef != null)
          {
-            if (trace) { log.trace("Failed to end", ignore); }
+            cursorRef.close();
          }
-         
-         throw e;
-      }
+      }     
+   }
       
-      environment.endWork(xid, false);
-      
-      environment.prepare(xid);
-   }
-   
-   public void commitPreparedMessages(Xid xid) throws Exception
-   {
-      environment.commit(xid);
-   }
-
-   public void unprepareMessages(Xid xid, List<Message> messages) throws Exception
-   {
-      environment.rollback(xid);      
-   }
-   
    public void updateDeliveryCount(Queue queue, MessageReference ref) throws Exception
    {
       //TODO - optimise this scan
@@ -455,8 +446,27 @@
    }
    
    // Private ---------------------------------------------------------------------------------
-   
-   
+
+   private void playTx(BDBJETransaction tx, List<Message> messagesToAdd,
+                       List<MessageReference> referencesToRemove) throws Exception
+   {
+      if (messagesToAdd != null)
+      {
+         for (Message message: messagesToAdd)
+         {
+            internalCommitMessage(tx, message);
+         }
+      }
+
+      if (referencesToRemove != null)
+      {
+         for (MessageReference ref: referencesToRemove)
+         {
+            internalDeleteReference(tx, ref);
+         }
+      }
+   }
+
    private void internalCommitMessage(BDBJETransaction tx, Message message) throws Exception
    {
       //First store the message

Modified: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/intf/PersistenceManager.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/intf/PersistenceManager.java	2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/intf/PersistenceManager.java	2007-12-10 12:53:34 UTC (rev 3456)
@@ -41,33 +41,33 @@
     * to 1 or more queues.
     * @param message
     */
-   public void commitMessage(Message message) throws Exception;
+   public void addMessage(Message message) throws Exception;
    
    /**
-    * Multiple messages, each potentially with multiple message references needs to be added to storage
-    * This would occur when a 1 PC transaction containing many messages arrives on the server and they need to be routed
-    * to 1 or more queues.
-    * @param messages
+    * Delete a single reference. This would also delete the message if it is no longer referenced by any other
+    * references.
+    * This would occur on acknowledgement of a single reference
+    * @param message
     */
-   public void commitMessages(List<Message> messages) throws Exception;
-   
+   void deleteReference(MessageReference reference) throws Exception;
+      
    /**
-    * Multiple messages, each potentially with multiple message references needs to be added to storage
-    * in prepared state - used for XA functionality
-    * This would occur when a 2 PC transaction containing many messages arrives on the server and they need to routed
-    * to 1 or more queues, but prepared first
-    * @param xid - the Xid of the transaction
-    * @param messages
+    * Commit a transaction containing messages to add and references to remove
+    * @param messagesToAdd List of messages to add, or null if none
+    * @param referencesToRemove List of references to remove, or null if none
+    * @throws Exception
     */
-   public void prepareMessages(Xid xid, List<Message> messages) throws Exception;
+   public void commitTransaction(List<Message> messagesToAdd, List<MessageReference> referencesToRemove) throws Exception;
    
    /**
-    * Unprepare a List of messages - each with potentially many message references
-    * This would occur when a 2 PC transaction rolls back
-    * @param xid - the Xid of the transaction
-    * @param messages
+    * Prepare a transaction containing messages to add and references to remove
+    * @param xid The Xid of the XA transaction
+    * @param messagesToAdd List of messages to add, or null if none
+    * @param referencesToRemove List of references to remove, or null if none
+    * @throws Exception
     */
-   public void unprepareMessages(Xid xid, List<Message> messages) throws Exception;
+   public void prepareTransaction(Xid xid, List<Message> messagesToAdd,
+                                  List<MessageReference> referencesToRemove) throws Exception;
    
    /**
     * Commit a prepared transaction
@@ -75,24 +75,20 @@
     * @param xid
     * @throws Exception
     */
-   public void commitPreparedMessages(Xid xid) throws Exception;
+   public void commitPreparedTransaction(Xid xid) throws Exception;
    
-   /**
-    * Delete a single reference. This would also delete the message if it is no longer referenced by any other
-    * references.
-    * This would occur on acknowledgement of a single reference
-    * @param message
-    */
-   void deleteReference(MessageReference reference) throws Exception;
    
    /**
-    * Delete a list of references from storage, also deleting their corresponding messages if they are no longer
-    * referenced.
-    * This would occur on acknowledgement of multiple references in a transaction.
-    * @param messages
+    * Unprepare a transaction containing messages to add and references to remove
+    * @param xid The Xid of the XA transaction
+    * @param messagesToAdd List of messages to add, or null if none
+    * @param referencesToRemove List of references to remove, or null if none
+    * @throws Exception
     */
-   void deleteReferences(List<MessageReference> references) throws Exception;
+   public void unprepareTransaction(Xid xid, List<Message> messagesToAdd,
+                                    List<MessageReference> referencesToRemove) throws Exception;
    
+
    /**
     * Update the delivery count of a reference
     * @param queue

Modified: branches/Branch_New_Persistence/src/main/org/jboss/test/concurrent/messaging/newcore/impl/QueueTest.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/test/concurrent/messaging/newcore/impl/QueueTest.java	2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/test/concurrent/messaging/newcore/impl/QueueTest.java	2007-12-10 12:53:34 UTC (rev 3456)
@@ -26,6 +26,7 @@
 
 import org.jboss.messaging.newcore.impl.QueueImpl;
 import org.jboss.messaging.newcore.intf.HandleStatus;
+import org.jboss.messaging.newcore.intf.Message;
 import org.jboss.messaging.newcore.intf.MessageReference;
 import org.jboss.messaging.newcore.intf.Queue;
 import org.jboss.test.unit.fakes.messaging.newcore.impl.FakeConsumer;
@@ -128,8 +129,10 @@
          
          while (System.currentTimeMillis() - start < testTime)
          {
-            MessageReference ref = generateReference(queue, i);
+            Message message = generateMessage(i);
             
+            MessageReference ref = message.createReference(queue);
+            
             queue.addLast(ref);
             
             refs.add(ref);

Modified: branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/UnitTestCase.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/UnitTestCase.java	2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/UnitTestCase.java	2007-12-10 12:53:34 UTC (rev 3456)
@@ -146,9 +146,25 @@
    {
       Message message = new MessageImpl(id, 0, true, 0, System.currentTimeMillis(), (byte)4);
       
+      byte[] bytes = new byte[1024];
+      
+      for (int i = 0; i < 1024; i++)
+      {
+         bytes[i] = (byte)i;
+      }
+      
+      message.setPayload(bytes);
+      
       return message;
    }
    
+   protected MessageReference generateReference(Queue queue, long id)
+   {
+      Message message = generateMessage(id);
+      
+      return message.createReference(queue);
+   }
+   
    protected void assertEquivalent(Message msg1, Message msg2)
    {
       assertEquals(msg1.getMessageID(), msg2.getMessageID());
@@ -185,6 +201,8 @@
          assertEquals(ref1.getScheduledDeliveryTime(), ref2.getScheduledDeliveryTime());
          
          assertEquals(ref1.getDeliveryCount(), ref2.getDeliveryCount());
+         
+         assertEquals(ref1.getQueue(), ref2.getQueue());
       }
    }
    

Deleted: branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/BDBJEPersistenceManagerTest.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/BDBJEPersistenceManagerTest.java	2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/BDBJEPersistenceManagerTest.java	2007-12-10 12:53:34 UTC (rev 3456)
@@ -1,681 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.unit.messaging.newcore.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.jboss.messaging.newcore.impl.MessageImpl;
-import org.jboss.messaging.newcore.impl.QueueImpl;
-import org.jboss.messaging.newcore.impl.bdbje.BDBJEDatabase;
-import org.jboss.messaging.newcore.impl.bdbje.BDBJEEnvironment;
-import org.jboss.messaging.newcore.impl.bdbje.BDBJEPersistenceManager;
-import org.jboss.messaging.newcore.intf.Message;
-import org.jboss.messaging.newcore.intf.MessageReference;
-import org.jboss.messaging.newcore.intf.Queue;
-import org.jboss.test.unit.fakes.messaging.newcore.impl.bdbje.FakeBDBJEEnvironment;
-import org.jboss.test.unit.messaging.UnitTestCase;
-
-/**
- * 
- * A BDBJEPersistenceManagerTest
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class BDBJEPersistenceManagerTest extends UnitTestCase
-{
-   protected static final String ENV_DIR = "test-env";
-   
-   protected BDBJEPersistenceManager pm;
-      
-   protected BDBJEEnvironment bdb;
-   
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-      
-      bdb = new FakeBDBJEEnvironment();
-      
-      pm = new BDBJEPersistenceManager(bdb, ENV_DIR);
-      
-      pm.start();
-   }
-   
-   protected void tearDown() throws Exception
-   {
-      super.tearDown();
-      
-      pm.stop();
-   }
-   
-   // The tests ----------------------------------------------------------------
-            
-   public void testCommitMessage() throws Exception
-   {      
-      Message m = generateMessage(1);
-      
-      Queue queue = new QueueImpl(67);
-      
-      m.createReference(queue);
-      
-      m.createReference(queue);
-      
-      m.createReference(queue);
-      
-      m.createReference(queue);
-      
-      pm.commitMessage(m);
-      
-      BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
-      
-      BDBJEDatabase refDB = bdb.getDatabase(BDBJEPersistenceManager.REFERENCE_DB_NAME);
-      
-      byte[] msgBytes = msgDB.get(m.getMessageID());
-      
-      assertNotNull(msgBytes);
-      
-      byte[] refBytes = refDB.get(m.getMessageID());
-      
-      assertNotNull(refBytes);
-      
-      Map<Long, Queue> queues = new HashMap<Long, Queue>();
-      
-      queues.put(queue.getID(), queue);
-      
-      Message m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
-       
-      assertEquivalent(m, m2);               
-   }
-   
-   public void testCommitMessages() throws Exception
-   {      
-      List<Message> msgs = new ArrayList<Message>();
-      
-      final int numMessages = 10;
-      
-      Queue queue = new QueueImpl(67);
-            
-      for (int i = 0; i < numMessages; i++)
-      {      
-         Message m = generateMessage(i);
-                  
-         m.createReference(queue);
-         
-         m.createReference(queue);
-         
-         m.createReference(queue);
-         
-         m.createReference(queue);
-         
-         msgs.add(m);
-      }
-      
-      pm.commitMessages(msgs);
-      
-      BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
-      
-      BDBJEDatabase refDB = bdb.getDatabase(BDBJEPersistenceManager.REFERENCE_DB_NAME);
-      
-      for (int i = 0; i < numMessages; i++)
-      {       
-         Message m = msgs.get(i);
-         
-         byte[] msgBytes = msgDB.get(m.getMessageID());
-         
-         assertNotNull(msgBytes);
-         
-         byte[] refBytes = refDB.get(m.getMessageID());
-         
-         assertNotNull(refBytes);
-         
-         Map<Long, Queue> queues = new HashMap<Long, Queue>();
-         
-         queues.put(queue.getID(), queue);
-         
-         Message m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
-          
-         assertEquivalent(m, m2); 
-      }            
-   }
-   
-   public void testDeleteReference() throws Exception
-   {      
-      Message m = generateMessage(1);
-      
-      Queue queue = new QueueImpl(67);
-      
-      MessageReference ref1 = m.createReference(queue);
-      
-      MessageReference ref2 = m.createReference(queue);
-      
-      MessageReference ref3 = m.createReference(queue);
-      
-      MessageReference ref4 = m.createReference(queue);
-      
-      pm.commitMessage(m);
-      
-      BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
-      
-      BDBJEDatabase refDB = bdb.getDatabase(BDBJEPersistenceManager.REFERENCE_DB_NAME);
-      
-      byte[] msgBytes = msgDB.get(m.getMessageID());
-      
-      assertNotNull(msgBytes);
-      
-      byte[] refBytes = refDB.get(m.getMessageID());
-      
-      assertNotNull(refBytes);
-      
-      Map<Long, Queue> queues = new HashMap<Long, Queue>();
-      
-      queues.put(queue.getID(), queue);
-      
-      Message m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
-       
-      assertEquivalent(m, m2);          
-      
-      assertEquals(4, m2.getReferences().size());
-      
-      
-      pm.deleteReference(ref2);
-      
-      msgBytes = msgDB.get(m.getMessageID());
-      
-      assertNotNull(msgBytes);
-      
-      refBytes = refDB.get(m.getMessageID());
-      
-      assertNotNull(refBytes);
-      
-      queues = new HashMap<Long, Queue>();
-      
-      queues.put(queue.getID(), queue);
-      
-      m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
-       
-      assertEquivalent(m, m2);          
-      
-      assertEquals(3, m2.getReferences().size());
-      
-      
-      pm.deleteReference(ref1);
-      
-      msgBytes = msgDB.get(m.getMessageID());
-      
-      assertNotNull(msgBytes);
-      
-      refBytes = refDB.get(m.getMessageID());
-      
-      assertNotNull(refBytes);
-      
-      queues = new HashMap<Long, Queue>();
-      
-      queues.put(queue.getID(), queue);
-      
-      m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
-       
-      assertEquivalent(m, m2);          
-      
-      assertEquals(2, m2.getReferences().size());
-      
-      
-      pm.deleteReference(ref3);
-      
-      msgBytes = msgDB.get(m.getMessageID());
-      
-      assertNotNull(msgBytes);
-      
-      refBytes = refDB.get(m.getMessageID());
-      
-      assertNotNull(refBytes);
-      
-      queues = new HashMap<Long, Queue>();
-      
-      queues.put(queue.getID(), queue);
-      
-      m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
-       
-      assertEquivalent(m, m2);          
-      
-      assertEquals(1, m2.getReferences().size());
-      
-      
-      pm.deleteReference(ref4);
-      
-      msgBytes = msgDB.get(m.getMessageID());
-      
-      assertNull(msgBytes);
-      
-      refBytes = refDB.get(m.getMessageID());
-      
-      assertNull(refBytes);            
-   }
-   
-   public void testDeleteReferences() throws Exception
-   {
-      //pm.deleteReferences(references)
-      
-   //   pm.
-   }
-   
-   
-   
-   
-//         
-//   public void testAddMultipleReferencesForSameMessageNoTx() throws Exception
-//   {
-//      FakeBDBJEIntf fakeBDB = new FakeBDBJEIntf();
-//      
-//      PersistenceManager pm = new BDBJEPersistenceManager(fakeBDB);
-//      
-//      MessageReference ref1 = generateReference(1);
-//                  
-//      Queue queue = new QueueImpl(67);
-//      
-//      pm.addReference(null, queue, ref1);
-//      
-//      MessageReference ref2 = ref1.getMessage().createReference();
-//      
-//      pm.addReference(null, queue, ref2);
-//      
-//      MessageReference ref3 = ref1.getMessage().createReference();
-//            
-//      pm.addReference(null, queue, ref3);
-//      
-//      MessageReference ref4 = ref1.getMessage().createReference();      
-//      
-//      pm.addReference(null, queue, ref4);
-//      
-//      byte[] bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//      
-//      assertNotNull(bytes);
-//      
-//      Message message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//      
-//      assertEquals(4, message.getReferences().size());
-//      
-//      assertTrue(message.isReliable());
-//      
-//      assertEquivalent(ref1.getMessage(), message);      
-//   }
-//   
-//   public void testRemoveReferenceNoTx() throws Exception
-//   {
-//      FakeBDBJEIntf fakeBDB = new FakeBDBJEIntf();
-//      
-//      PersistenceManager pm = new BDBJEPersistenceManager(fakeBDB);
-//      
-//      MessageReference ref1 = generateReference(1);
-//                  
-//      Queue queue = new QueueImpl(67);
-//      
-//      pm.addReference(null, queue, ref1);
-//      
-//      MessageReference ref2 = ref1.getMessage().createReference();
-//      
-//      pm.addReference(null, queue, ref2);
-//      
-//      MessageReference ref3 = ref1.getMessage().createReference();
-//            
-//      pm.addReference(null, queue, ref3);
-//      
-//      MessageReference ref4 = ref1.getMessage().createReference();      
-//      
-//      pm.addReference(null, queue, ref4);
-//      
-//      byte[] bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//      
-//      assertNotNull(bytes);
-//      
-//      Message message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//      
-//      assertEquals(4, message.getReferences().size());
-//      
-//      assertTrue(message.isReliable());
-//      
-//      assertEquivalent(ref1.getMessage(), message);      
-//      
-//            
-//      pm.removeReference(null, queue, ref3);
-//      
-//      bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//      
-//      assertNotNull(bytes);
-//      
-//      message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//      
-//      assertEquals(3, message.getReferences().size());
-//      
-//      assertTrue(message.isReliable());
-//      
-//      assertEquivalent(ref1.getMessage(), message);  
-//      
-//      
-//      pm.removeReference(null, queue, ref1);
-//      
-//      bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//      
-//      assertNotNull(bytes);
-//      
-//      message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//      
-//      assertEquals(2, message.getReferences().size());
-//      
-//      assertTrue(message.isReliable());
-//      
-//      assertEquivalent(ref1.getMessage(), message);  
-//      
-//            
-//      pm.removeReference(null, queue, ref2);
-//      
-//      bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//      
-//      assertNotNull(bytes);
-//      
-//      message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//      
-//      assertEquals(1, message.getReferences().size());
-//      
-//      assertTrue(message.isReliable());
-//      
-//      assertEquivalent(ref1.getMessage(), message);  
-//      
-//      
-//      pm.removeReference(null, queue, ref4);
-//      
-//      bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//      
-//      assertNull(bytes);      
-//      
-//      
-//      assertTrue(fakeBDB.getStore().isEmpty());                  
-//   }
-//   
-//   public void testUpdateDeliveryCount() throws Exception
-//   {
-//      FakeBDBJEIntf fakeBDB = new FakeBDBJEIntf();
-//      
-//      PersistenceManager pm = new BDBJEPersistenceManager(fakeBDB);
-//      
-//      MessageReference ref1 = generateReference(1);
-//                  
-//      Queue queue = new QueueImpl(67);
-//      
-//      pm.addReference(null, queue, ref1);
-//      
-//      MessageReference ref2 = ref1.getMessage().createReference();
-//      
-//      pm.addReference(null, queue, ref2);
-//      
-//      MessageReference ref3 = ref1.getMessage().createReference();
-//            
-//      pm.addReference(null, queue, ref3);
-//      
-//      MessageReference ref4 = ref1.getMessage().createReference();      
-//      
-//      pm.addReference(null, queue, ref4);
-//                  
-//      ref2.setDeliveryCount(765);
-//      
-//      pm.updateDeliveryCount(queue, ref2);
-//      
-//      byte[] bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//      
-//      assertNotNull(bytes);
-//      
-//      Message message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//      
-//      assertEquals(4, message.getReferences().size());
-//      
-//      assertTrue(message.isReliable());
-//      
-//      assertEquivalent(ref1.getMessage(), message);  
-//      
-//      assertEquals(765, ref2.getDeliveryCount());
-//      
-//      assertEquals(ref2.getDeliveryCount(), message.getReferences().get(1).getDeliveryCount());                  
-//      
-//      ref4.setDeliveryCount(10101);
-//      
-//      pm.updateDeliveryCount(queue, ref4);
-//      
-//      bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//      
-//      assertNotNull(bytes);
-//      
-//      message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//      
-//      assertEquals(4, message.getReferences().size());
-//      
-//      assertTrue(message.isReliable());
-//      
-//      assertEquivalent(ref1.getMessage(), message);  
-//      
-//      assertEquals(10101, ref4.getDeliveryCount());
-//      
-//      assertEquals(ref4.getDeliveryCount(), message.getReferences().get(3).getDeliveryCount());
-//      
-//      assertEquals(ref1.getDeliveryCount(), message.getReferences().get(0).getDeliveryCount());
-//      
-//      assertEquals(ref3.getDeliveryCount(), message.getReferences().get(2).getDeliveryCount());
-//   }
-//   
-//   public void testAddReferenceTxCommit() throws Exception
-//   {
-//      FakeBDBJEIntf fakeBDB = new FakeBDBJEIntf();
-//      
-//      PersistenceManager pm = new BDBJEPersistenceManager(fakeBDB);
-//      
-//      MessageReference ref1 = generateReference(1);
-//      
-//      Queue queue = new QueueImpl(67);
-//      
-//      PersistenceTransaction tx = pm.createTransaction();
-//      
-//      pm.addReference(tx, queue, ref1);
-//      
-//      byte[] bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//      
-//      assertNull(bytes);
-//      
-//      tx.commit();
-//      
-//      bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//      
-//      assertNotNull(bytes);
-//      
-//      Message message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//      
-//      assertTrue(message.isReliable());
-//      
-//      assertEquivalent(ref1.getMessage(), message);                  
-//   }
-//   
-//   public void testAddReferenceTxRollback() throws Exception
-//   {
-//      FakeBDBJEIntf fakeBDB = new FakeBDBJEIntf();
-//      
-//      PersistenceManager pm = new BDBJEPersistenceManager(fakeBDB);
-//      
-//      MessageReference ref1 = generateReference(1);
-//      
-//      Queue queue = new QueueImpl(67);
-//      
-//      PersistenceTransaction tx = pm.createTransaction();
-//      
-//      pm.addReference(tx, queue, ref1);
-//      
-//      byte[] bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//      
-//      assertNull(bytes);
-//      
-//      tx.rollback();
-//      
-//      bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//      
-//      assertNull(bytes);                 
-//   }
-//   
-//   /*
-//   test adding different refs for different queues in same tx
-//   
-//   plus removing refs for different queues in same tx
-//   
-//   test loading queues (multiple quuees)
-//   
-//   test loading queues with different queues holding refs for same message (like subs)
-//   */
-//   
-//   public void testLoadQueues() throws Exception
-//   {
-//      Queue queue1 = new QueueImpl(1);
-//      
-//      Queue queue2 = new QueueImpl(2);
-//      
-//      Queue queue3 = new QueueImpl(3);
-//      
-//      FakeBDBJEIntf fakeBDB = new FakeBDBJEIntf();
-//      
-//      PersistenceManager pm = new BDBJEPersistenceManager(fakeBDB);
-//      
-//      MessageReference ref1 = generateReference(1);
-//      MessageReference ref2 = generateReference(2);
-//      MessageReference ref3 = generateReference(3);
-//      MessageReference ref4 = generateReference(4);
-//      MessageReference ref5 = generateReference(5);
-//      MessageReference ref6 = generateReference(6);
-//      MessageReference ref7 = generateReference(7);
-//      MessageReference ref8 = generateReference(8);
-//      MessageReference ref9 = generateReference(9);
-//      MessageReference ref10 = generateReference(10);
-//      
-//      pm.addReference(null, queue1, ref1);
-//      pm.addReference(null, queue1, ref2);
-//      pm.addReference(null, queue1, ref3);
-//      
-//      pm.addReference(null, queue2, ref4);
-//      pm.addReference(null, queue2, ref5);
-//      pm.addReference(null, queue2, ref6);
-//      
-//      pm.addReference(null, queue3, ref7);
-//      pm.addReference(null, queue3, ref8);
-//      pm.addReference(null, queue3, ref9);
-//      pm.addReference(null, queue3, ref10);
-//      
-//      assertEquals(10, fakeBDB.getStore().size());
-//      
-//      Map<Long, Queue> map = new HashMap<Long, Queue>();
-//      
-//      map.put(queue1.getID(), queue1);
-//      map.put(queue2.getID(), queue2);
-//      map.put(queue3.getID(), queue3);
-//      
-//      pm.loadQueues(map);
-//      
-//      assertEquals(3, queue1.getMessageCount());
-//      assertEquals(3, queue2.getMessageCount());
-//      assertEquals(4, queue3.getMessageCount());
-//      
-//      List<MessageReference> refs = new ArrayList<MessageReference>();
-//      
-//      refs.add(ref1);
-//      refs.add(ref2);
-//      refs.add(ref3);
-//      assertRefListsEquivalent(queue1.list(null), refs);
-//      
-//      refs.clear();     
-//      refs.add(ref4);
-//      refs.add(ref5);
-//      refs.add(ref6);
-//      assertRefListsEquivalent(queue2.list(null), refs);
-//      
-//      refs.clear();
-//      refs.add(ref7);
-//      refs.add(ref8);
-//      refs.add(ref9);
-//      refs.add(ref10);
-//      assertRefListsEquivalent(queue3.list(null), refs);
-//      
-//   }
-//   
-//   // Private --------------------------------------------------------------------
-   
-   private Message extractMessage(Map<Long, Queue> queues, long id, byte[] msgBytes, byte[] refBytes) throws Exception
-   {
-      ByteBuffer buffer = ByteBuffer.wrap(msgBytes);
-      
-      int type = buffer.getInt();
-      
-      long expiration = buffer.getLong();
-      
-      long timestamp = buffer.getLong();
-      
-      byte priority = buffer.get();
-      
-      int headerSize = buffer.getInt();
-      
-      byte[] headers = new byte[headerSize];
-      
-      buffer.get(headers);
-      
-      int payloadSize = buffer.getInt();
-      
-      byte[] payload = null;
-      
-      if (payloadSize != 0)
-      {
-         payload = new byte[payloadSize];
-         
-         buffer.get(payload);
-      }
-      
-      Message message = new MessageImpl(id, type, true, expiration, timestamp, priority,
-                                        headers, payload);
-      
-      buffer = ByteBuffer.wrap(refBytes);
-      
-      while (buffer.hasRemaining())
-      {
-         long queueID = buffer.getLong();
-         
-         int deliveryCount = buffer.getInt();
-         
-         long scheduledDeliveryTime = buffer.getLong();
-         
-         MessageReference reference = message.createReference(queues.get(queueID));
-         
-         reference.setDeliveryCount(deliveryCount);
-         
-         reference.setScheduledDeliveryTime(scheduledDeliveryTime);
-      } 
-      
-      return message;
-   }
-   
-
-   // Inner classes ---------------------------------------------------------------
-        
-}

Modified: branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/MessageTest.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/MessageTest.java	2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/MessageTest.java	2007-12-10 12:53:34 UTC (rev 3456)
@@ -35,7 +35,7 @@
  * 
  * Tests for Message and MessageReference
  * 
- * TODO - Test header and payload, streaming and destreaming
+ * TODO - Test streaming and destreaming
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
@@ -146,6 +146,18 @@
       assertEquals(connectionID, message.getConnectionID());      
    }
    
+   public void testSetAndGetPayload()
+   {
+      Message message = new MessageImpl();
+      
+      assertNull(message.getPayload());
+      
+      byte[] bytes = "blah blah blah".getBytes();
+      message.setPayload(bytes);
+      
+      assertByteArraysEquivalent(bytes, message.getPayload());            
+   }
+   
    public void testHeaders()
    {
       Message message = new MessageImpl();

Added: branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/TransactionTest.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/TransactionTest.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/TransactionTest.java	2007-12-10 12:53:34 UTC (rev 3456)
@@ -0,0 +1,326 @@
+package org.jboss.test.unit.messaging.newcore.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.newcore.impl.QueueImpl;
+import org.jboss.messaging.newcore.impl.TransactionImpl;
+import org.jboss.messaging.newcore.intf.Message;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.PersistenceManager;
+import org.jboss.messaging.newcore.intf.Queue;
+import org.jboss.messaging.newcore.intf.Transaction;
+import org.jboss.messaging.newcore.intf.TransactionSynchronization;
+import org.jboss.test.unit.messaging.UnitTestCase;
+
+/**
+ * 
+ * A TransactionTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class TransactionTest extends UnitTestCase
+{
+   
+   public void test1PCCommit() throws Exception
+   {
+      List<Message> msgsToAdd = new ArrayList<Message>();
+      
+      List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+      
+      Queue queue = new QueueImpl(1);
+      
+      MessageReference ref1 = this.generateReference(queue, 1);
+      msgsToAdd.add(ref1.getMessage());
+      
+      MessageReference ref2 = this.generateReference(queue, 2);
+      refsToRemove.add(ref2);
+                  
+      Transaction tx = new TransactionImpl(msgsToAdd, refsToRemove, true);
+      
+      PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+      
+      pm.commitTransaction(msgsToAdd, refsToRemove);
+      
+      EasyMock.replay(pm);
+      
+      tx.commit(pm);
+      
+      EasyMock.verify(pm);
+      
+      assertEquals(ref1, queue.list(null).get(0));
+   }
+   
+   public void test1PCRollback() throws Exception
+   {
+      List<Message> msgsToAdd = new ArrayList<Message>();
+      
+      List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+      
+      Queue queue = new QueueImpl(1);
+      
+      MessageReference ref1 = this.generateReference(queue, 1);
+      msgsToAdd.add(ref1.getMessage());
+      
+      MessageReference ref2 = this.generateReference(queue, 2);
+      refsToRemove.add(ref2);
+                  
+      Transaction tx = new TransactionImpl(msgsToAdd, refsToRemove, true);
+      
+      PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+      
+      EasyMock.replay(pm);
+      
+      tx.rollback(pm);
+      
+      EasyMock.verify(pm);
+ 
+      assertTrue(queue.list(null).isEmpty());
+   }
+   
+   public void test1PCPrepare() throws Exception
+   {
+      List<Message> msgsToAdd = new ArrayList<Message>();
+      
+      List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+      
+      Queue queue = new QueueImpl(1);
+      
+      MessageReference ref1 = this.generateReference(queue, 1);
+      msgsToAdd.add(ref1.getMessage());
+      
+      MessageReference ref2 = this.generateReference(queue, 2);
+      refsToRemove.add(ref2);
+                  
+      Transaction tx = new TransactionImpl(msgsToAdd, refsToRemove, true);
+      
+      PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+      
+      try
+      {
+         tx.prepare(pm);
+         fail("Should throw exception");
+      }
+      catch (IllegalStateException e)
+      {
+         //OK
+      }   
+      
+      assertTrue(queue.list(null).isEmpty());
+   }
+   
+   public void test2PCPrepareCommit() throws Exception
+   {
+      List<Message> msgsToAdd = new ArrayList<Message>();
+      
+      List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+      
+      Queue queue = new QueueImpl(1);
+      
+      MessageReference ref1 = this.generateReference(queue, 1);
+      msgsToAdd.add(ref1.getMessage());
+      
+      MessageReference ref2 = this.generateReference(queue, 2);
+      refsToRemove.add(ref2);
+      
+      Xid xid = generateXid();
+                  
+      Transaction tx = new TransactionImpl(xid, msgsToAdd, refsToRemove, true);
+      
+      PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+      
+      pm.prepareTransaction(xid, msgsToAdd, refsToRemove);
+      
+      EasyMock.replay(pm);
+      
+      tx.prepare(pm);
+      
+      EasyMock.verify(pm);
+      
+      EasyMock.reset(pm);
+      
+      pm.commitPreparedTransaction(xid);
+      
+      EasyMock.replay(pm);
+      
+      tx.commit(pm);
+      
+      EasyMock.verify(pm);
+   }
+   
+   public void test2PCCommitBeforePrepare() throws Exception
+   {
+      List<Message> msgsToAdd = new ArrayList<Message>();
+      
+      List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+      
+      Queue queue = new QueueImpl(1);
+      
+      MessageReference ref1 = this.generateReference(queue, 1);
+      msgsToAdd.add(ref1.getMessage());
+      
+      MessageReference ref2 = this.generateReference(queue, 2);
+      refsToRemove.add(ref2);
+      
+      Xid xid = generateXid();
+                  
+      Transaction tx = new TransactionImpl(xid, msgsToAdd, refsToRemove, true);
+      
+      PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+      
+      try
+      {    
+         tx.commit(pm);
+         
+         fail ("Should throw exception");
+      }
+      catch (IllegalStateException e)
+      {
+         //Ok
+      }      
+   }
+   
+   public void test2PCPrepareRollback() throws Exception
+   {
+      List<Message> msgsToAdd = new ArrayList<Message>();
+      
+      List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+      
+      Queue queue = new QueueImpl(1);
+      
+      MessageReference ref1 = this.generateReference(queue, 1);
+      msgsToAdd.add(ref1.getMessage());
+      
+      MessageReference ref2 = this.generateReference(queue, 2);
+      refsToRemove.add(ref2);
+      
+      Xid xid = generateXid();
+                  
+      Transaction tx = new TransactionImpl(xid, msgsToAdd, refsToRemove, true);
+      
+      PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+      
+      pm.prepareTransaction(xid, msgsToAdd, refsToRemove);
+      
+      EasyMock.replay(pm);
+      
+      tx.prepare(pm);
+      
+      EasyMock.verify(pm);
+      
+      EasyMock.reset(pm);
+      
+      pm.unprepareTransaction(xid, msgsToAdd, refsToRemove);
+      
+      EasyMock.replay(pm);
+      
+      tx.rollback(pm);
+      
+      EasyMock.verify(pm);
+   }
+   
+   public void testSynchronizations() throws Exception
+   {
+      List<Message> msgsToAdd = new ArrayList<Message>();
+      
+      List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+      
+      Queue queue = new QueueImpl(1);
+      
+      MessageReference ref1 = this.generateReference(queue, 1);
+      msgsToAdd.add(ref1.getMessage());
+      
+      MessageReference ref2 = this.generateReference(queue, 2);
+      refsToRemove.add(ref2);
+                  
+      Transaction tx = new TransactionImpl(msgsToAdd, refsToRemove, true);
+      
+      TransactionSynchronization sync = EasyMock.createStrictMock(TransactionSynchronization.class);
+      
+      PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+      
+      tx.addSynchronization(sync);
+      
+      sync.beforeCommit();
+      sync.afterCommit();
+      
+      EasyMock.replay(sync);
+      
+      tx.commit(pm);
+      
+      EasyMock.verify(sync);
+      
+      EasyMock.reset(sync);
+      
+      tx = new TransactionImpl(msgsToAdd, refsToRemove, true);
+      
+      tx.addSynchronization(sync);
+      
+      sync.beforeRollback();
+      sync.afterRollback();
+      
+      EasyMock.replay(sync);
+      
+      tx.rollback(pm);
+      
+      EasyMock.verify(sync);            
+   }
+   
+   public void testSynchronizations2PC() throws Exception
+   {
+      List<Message> msgsToAdd = new ArrayList<Message>();
+      
+      List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+      
+      Queue queue = new QueueImpl(1);
+      
+      MessageReference ref1 = this.generateReference(queue, 1);
+      msgsToAdd.add(ref1.getMessage());
+      
+      MessageReference ref2 = this.generateReference(queue, 2);
+      refsToRemove.add(ref2);
+      
+      Xid xid = generateXid();
+                  
+      Transaction tx = new TransactionImpl(xid, msgsToAdd, refsToRemove, true);
+      
+      TransactionSynchronization sync = EasyMock.createStrictMock(TransactionSynchronization.class);
+      
+      PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+      
+      tx.addSynchronization(sync);
+      
+      sync.beforeCommit();
+      sync.afterCommit();
+      
+      EasyMock.replay(sync);
+      
+      tx.prepare(pm);
+      tx.commit(pm);
+      
+      EasyMock.verify(sync);
+      
+      EasyMock.reset(sync);
+      
+      xid = generateXid();
+      
+      tx = new TransactionImpl(xid, msgsToAdd, refsToRemove, true);
+      
+      tx.addSynchronization(sync);
+      
+      sync.beforeRollback();
+      sync.afterRollback();
+      
+      EasyMock.replay(sync);
+      
+      tx.prepare(pm);
+      tx.rollback(pm);
+      
+      EasyMock.verify(sync);            
+   }
+
+}

Copied: branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/bdbje/BDBJEPersistenceManagerTest.java (from rev 3414, branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/BDBJEPersistenceManagerTest.java)
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/bdbje/BDBJEPersistenceManagerTest.java	                        (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/bdbje/BDBJEPersistenceManagerTest.java	2007-12-10 12:53:34 UTC (rev 3456)
@@ -0,0 +1,786 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.unit.messaging.newcore.impl.bdbje;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.newcore.impl.MessageImpl;
+import org.jboss.messaging.newcore.impl.QueueImpl;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJEDatabase;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJEEnvironment;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJEPersistenceManager;
+import org.jboss.messaging.newcore.intf.Message;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.Queue;
+import org.jboss.test.unit.fakes.messaging.newcore.impl.bdbje.FakeBDBJEEnvironment;
+import org.jboss.test.unit.messaging.UnitTestCase;
+
+/**
+ * 
+ * A BDBJEPersistenceManagerTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class BDBJEPersistenceManagerTest extends UnitTestCase
+{
+   protected static final String ENV_DIR = "test-env";
+   
+   protected BDBJEPersistenceManager pm;
+      
+   protected BDBJEEnvironment bdb;
+   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      bdb = new FakeBDBJEEnvironment();
+      
+      pm = new BDBJEPersistenceManager(bdb, ENV_DIR);
+      
+      pm.start();
+   }
+   
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      pm.stop();
+   }
+   
+   // The tests ----------------------------------------------------------------
+              
+   public void testAddMessage() throws Exception
+   {      
+      Queue queue = new QueueImpl(67);
+            
+      Message m = createMessageWithRefs(1, queue);
+      
+      pm.addMessage(m);
+      
+      assertMessageInStore(m, queue);   
+   }
+         
+   public void testDeleteReference() throws Exception
+   {            
+      Queue queue = new QueueImpl(67);
+      
+      Message m = createMessageWithRefs(1, queue);
+      
+      List<MessageReference> refs = new ArrayList<MessageReference>(m.getReferences());
+      
+      
+      pm.addMessage(m);
+      
+      assertMessageInStore(m, queue);
+      
+      
+      pm.deleteReference(refs.get(2));
+      
+      assertMessageInStore(m, queue);
+                      
+      assertEquals(3, m.getReferences().size());
+      
+      assertTrue(m.getReferences().contains(refs.get(0)));
+      assertTrue(m.getReferences().contains(refs.get(1)));
+      assertTrue(m.getReferences().contains(refs.get(3)));
+      
+      
+      pm.deleteReference(refs.get(1));
+      
+      assertMessageInStore(m, queue);
+      
+      assertEquals(2, m.getReferences().size());
+      
+      assertTrue(m.getReferences().contains(refs.get(0)));
+      assertTrue(m.getReferences().contains(refs.get(3)));
+      
+      
+                
+      pm.deleteReference(refs.get(3));
+      
+      assertMessageInStore(m, queue);
+      
+      assertEquals(1, m.getReferences().size());
+      
+      assertTrue(m.getReferences().contains(refs.get(0)));
+            
+      
+      pm.deleteReference(refs.get(0));
+            
+      assertMessageNotInStore(m);         
+      
+      assertStoreEmpty();
+   }
+   
+   public void testCommitTransaction() throws Exception
+   {      
+      List<Message> msgs = new ArrayList<Message>();
+          
+      Queue queue = new QueueImpl(67);
+            
+      Message m1 = createMessageWithRefs(1, queue);
+      List<MessageReference> m1Refs = new ArrayList<MessageReference>(m1.getReferences());
+    
+      msgs.add(m1);
+      
+      Message m2 = createMessageWithRefs(2, queue);
+      
+      msgs.add(m2);
+      
+      Message m3 = createMessageWithRefs(3, queue);
+      List<MessageReference> m3Refs = new ArrayList<MessageReference>(m3.getReferences());
+      
+      msgs.add(m3);
+      
+      pm.commitTransaction(msgs, null);
+       
+      assertMessageInStore(m1, queue);
+      
+      assertMessageInStore(m2, queue);
+      
+      assertMessageInStore(m3, queue);
+      
+      
+      //Add a couple more
+      
+      List<Message> msgsMore = new ArrayList<Message>();
+            
+      Message m4 = createMessageWithRefs(4, queue);      
+      msgsMore.add(m4);
+      
+      Message m5 = createMessageWithRefs(5, queue);      
+      msgsMore.add(m5);
+      
+      //Delete some refs
+      
+      List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+      
+      refsToRemove.add(m1.getReferences().get(0));
+      refsToRemove.add(m1.getReferences().get(3));
+      
+      refsToRemove.add(m2.getReferences().get(0));
+      refsToRemove.add(m2.getReferences().get(1));
+      refsToRemove.add(m2.getReferences().get(2));
+      refsToRemove.add(m2.getReferences().get(3));
+      
+      refsToRemove.add(m3.getReferences().get(2));
+      
+      pm.commitTransaction(msgsMore, refsToRemove);
+      
+      assertMessageInStore(m1, queue);
+      assertEquals(2, m1.getReferences().size());
+      assertTrue(m1.getReferences().contains(m1Refs.get(1)));
+      assertTrue(m1.getReferences().contains(m1Refs.get(2)));
+      
+      assertMessageNotInStore(m2);
+      
+      assertMessageInStore(m3, queue);
+      assertEquals(3, m3.getReferences().size());
+      assertTrue(m3.getReferences().contains(m3Refs.get(0)));
+      assertTrue(m3.getReferences().contains(m3Refs.get(1)));
+      assertTrue(m3.getReferences().contains(m3Refs.get(3)));
+      
+      assertMessageInStore(m4, queue);
+      assertEquals(4, m4.getReferences().size());
+      
+      assertMessageInStore(m5, queue);
+      assertEquals(4, m5.getReferences().size());
+      
+      //Delete the rest
+      refsToRemove.clear();
+      refsToRemove.addAll(m1.getReferences());
+      refsToRemove.addAll(m3.getReferences());
+      refsToRemove.addAll(m4.getReferences());
+      refsToRemove.addAll(m5.getReferences());
+      
+      pm.commitTransaction(null, refsToRemove);
+      
+      assertMessageNotInStore(m1);
+      assertMessageNotInStore(m2);
+      assertMessageNotInStore(m4);
+      assertMessageNotInStore(m5);
+      assertMessageNotInStore(m5);
+      
+      //try with nulls
+      pm.commitTransaction(null, null);
+      
+   }     
+   
+   public void testPrepareAndCommitTransaction() throws Exception
+   {      
+      List<Message> msgs = new ArrayList<Message>();
+          
+      Queue queue = new QueueImpl(67);
+            
+      Message m1 = createMessageWithRefs(1, queue);
+      List<MessageReference> m1Refs = new ArrayList<MessageReference>(m1.getReferences());
+    
+      msgs.add(m1);
+      
+      Message m2 = createMessageWithRefs(2, queue);
+
+      msgs.add(m2);
+      
+      Message m3 = createMessageWithRefs(3, queue);
+      List<MessageReference> m3Refs = new ArrayList<MessageReference>(m3.getReferences());
+      
+      msgs.add(m3);
+      
+      pm.commitTransaction(msgs, null);
+       
+      assertMessageInStore(m1, queue);
+      
+      assertMessageInStore(m2, queue);
+      
+      assertMessageInStore(m3, queue);
+      
+      
+      //Add a couple more
+      
+      List<Message> msgsMore = new ArrayList<Message>();
+            
+      Message m4 = createMessageWithRefs(4, queue);
+      msgsMore.add(m4);
+      
+      Message m5 = createMessageWithRefs(5, queue);
+
+      msgsMore.add(m5);
+      
+      //Delete some refs
+      
+      List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+      
+      refsToRemove.add(m1.getReferences().get(0));
+      refsToRemove.add(m1.getReferences().get(3));
+      
+      refsToRemove.add(m2.getReferences().get(0));
+      refsToRemove.add(m2.getReferences().get(1));
+      refsToRemove.add(m2.getReferences().get(2));
+      refsToRemove.add(m2.getReferences().get(3));
+      
+      refsToRemove.add(m3.getReferences().get(2));
+      
+      Xid xid = generateXid();
+      
+      pm.prepareTransaction(xid, msgsMore, refsToRemove);
+      
+      pm.commitPreparedTransaction(xid);
+      
+      assertMessageInStore(m1, queue);
+      assertEquals(2, m1.getReferences().size());
+      assertTrue(m1.getReferences().contains(m1Refs.get(1)));
+      assertTrue(m1.getReferences().contains(m1Refs.get(2)));
+      
+      assertMessageNotInStore(m2);
+      
+      assertMessageInStore(m3, queue);
+      assertEquals(3, m3.getReferences().size());
+      assertTrue(m3.getReferences().contains(m3Refs.get(0)));
+      assertTrue(m3.getReferences().contains(m3Refs.get(1)));
+      assertTrue(m3.getReferences().contains(m3Refs.get(3)));
+      
+      assertMessageInStore(m4, queue);
+      assertEquals(4, m4.getReferences().size());
+      
+      assertMessageInStore(m5, queue);
+      assertEquals(4, m5.getReferences().size());
+      
+      //Delete the rest
+      refsToRemove.clear();
+      refsToRemove.addAll(m1.getReferences());
+      refsToRemove.addAll(m3.getReferences());
+      refsToRemove.addAll(m4.getReferences());
+      refsToRemove.addAll(m5.getReferences());
+      
+      xid = generateXid();
+      
+      pm.prepareTransaction(xid, null, refsToRemove);
+      
+      pm.commitPreparedTransaction(xid);
+      
+      assertMessageNotInStore(m1);
+      assertMessageNotInStore(m2);
+      assertMessageNotInStore(m4);
+      assertMessageNotInStore(m5);
+      assertMessageNotInStore(m5);
+      
+      //try with nulls
+      xid = generateXid();
+      pm.prepareTransaction(xid, null, null);
+      pm.commitPreparedTransaction(xid);
+      
+   }     
+   
+   public void testPrepareAndUnprepareTransaction() throws Exception
+   {      
+      List<Message> msgs = new ArrayList<Message>();
+          
+      Queue queue = new QueueImpl(67);
+            
+      Message m1 = createMessageWithRefs(1, queue);
+
+      msgs.add(m1);
+      
+      Message m2 = createMessageWithRefs(2, queue);
+
+      msgs.add(m2);
+      
+      Message m3 = createMessageWithRefs(3, queue);
+
+      msgs.add(m3);
+      
+      pm.commitTransaction(msgs, null);
+       
+      assertMessageInStore(m1, queue);
+      
+      assertMessageInStore(m2, queue);
+      
+      assertMessageInStore(m3, queue);
+      
+      
+      //Add a couple more
+      
+      List<Message> msgsMore = new ArrayList<Message>();
+            
+      Message m4 = createMessageWithRefs(4, queue);
+      msgsMore.add(m4);
+      
+      Message m5 = createMessageWithRefs(5, queue);
+      msgsMore.add(m5);
+      
+      //Delete some refs
+      
+      List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+      
+      refsToRemove.add(m1.getReferences().get(0));
+      refsToRemove.add(m1.getReferences().get(3));
+      
+      refsToRemove.add(m2.getReferences().get(0));
+      refsToRemove.add(m2.getReferences().get(1));
+      refsToRemove.add(m2.getReferences().get(2));
+      refsToRemove.add(m2.getReferences().get(3));
+      
+      refsToRemove.add(m3.getReferences().get(2));
+      
+      Xid xid = generateXid();
+      
+      pm.prepareTransaction(xid, msgsMore, refsToRemove);
+      
+      pm.unprepareTransaction(xid, msgsMore, refsToRemove);
+                  
+      assertNumMessagesInStore(3);      
+   }     
+   
+   public void testUpdateDeliveryCount() throws Exception
+   {
+      Queue queue = new QueueImpl(67);
+      
+      Message m1 = createMessageWithRefs(1, queue);
+      
+      assertEquals(0, m1.getReferences().get(0).getDeliveryCount());
+      assertEquals(0, m1.getReferences().get(1).getDeliveryCount());
+      assertEquals(0, m1.getReferences().get(2).getDeliveryCount());
+      assertEquals(0, m1.getReferences().get(3).getDeliveryCount());
+      
+      pm.addMessage(m1);
+      
+      final int delCount = 77;
+      m1.getReferences().get(1).setDeliveryCount(delCount);
+      pm.updateDeliveryCount(queue, m1.getReferences().get(1));
+      
+      final int delCount2 = 423;
+      
+      m1.getReferences().get(3).setDeliveryCount(delCount2);
+      pm.updateDeliveryCount(queue, m1.getReferences().get(3));
+      
+      assertMessageInStore(m1, queue);
+   }
+   
+   public void testRefsWithDifferentQueues() throws Exception
+   {
+      final int numQueues = 10;
+      
+      List<Message> msgs = new ArrayList<Message>();
+                  
+      for (int i = 0; i < numQueues; i++)
+      {
+         Queue queue = new QueueImpl(i);
+         
+         MessageReference ref = generateReference(queue, i);
+         
+         msgs.add(ref.getMessage());
+         
+         pm.addMessage(ref.getMessage());   
+         
+         assertEquals(queue, ref.getQueue());
+      }
+      
+      for (Message msg: msgs)
+      {
+         assertMessageInStore(msg, msg.getReferences().get(0).getQueue());
+      }            
+   }
+   
+   public void testLoadQueues() throws Exception
+   {
+      Map<Long, Queue> queues = new HashMap<Long, Queue>();
+      
+      final int numQueues = 10;
+      
+      final int numMessages = 10;
+      
+      for (int i = 0; i < numQueues; i++)
+      {
+         Queue queue = new QueueImpl(i);
+         
+         queues.put(queue.getID(), queue);
+      }
+         
+      List<Message> msgs = new ArrayList<Message>();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         Message msg = this.generateMessage(i);
+         
+         msgs.add(msg);
+         
+         for (long j = 0; j < numQueues; j++)
+         {
+            Queue queue = queues.get(j);
+            
+            msg.createReference(queue);
+         }
+         
+         pm.addMessage(msg);   
+      }         
+      
+      
+      pm.loadQueues(queues);
+      
+      for (Queue queue: queues.values())
+      {
+         assertEquals(numMessages, queue.getMessageCount());
+         
+         List<MessageReference> refs = queue.list(null);
+         
+         int i = 0;
+         for (MessageReference ref: refs)
+         {
+            this.assertEquivalent(msgs.get(i++), ref.getMessage());
+         }
+      }            
+   }  
+   
+   public void testGetInDoubtXids() throws Exception
+   {
+      Queue queue = new QueueImpl(12);
+      
+      Message message1 = createMessageWithRefs(1, queue);
+      
+      List<Message> msgs = new ArrayList<Message>();
+      
+      msgs.add(message1);
+      
+      Xid xid1 = generateXid();
+      
+      pm.prepareTransaction(xid1, msgs, null);    
+      
+      pm.setInRecoveryMode(true);
+      
+      List<Xid> xids = pm.getInDoubtXids();
+      
+      assertNotNull(xids);
+      
+      assertEquals(1, xids.size());
+      
+      assertEquals(xid1, xids.get(0));
+      
+      
+      
+      Message message2 = createMessageWithRefs(2, queue);
+      
+      msgs.clear();
+      
+      msgs.add(message2);
+      
+      Xid xid2 = generateXid();
+      
+      pm.prepareTransaction(xid2, msgs, null);    
+      
+      xids = pm.getInDoubtXids();
+      
+      assertNotNull(xids);
+      
+      assertEquals(2, xids.size());
+      
+      assertTrue(xids.contains(xid1));
+      
+      assertTrue(xids.contains(xid2));
+      
+      
+      pm.commitPreparedTransaction(xid1);
+      
+      pm.commitPreparedTransaction(xid2);
+      
+      xids = pm.getInDoubtXids();
+      
+      assertNotNull(xids);
+      
+      assertEquals(0, xids.size());            
+   }
+   
+   public void testGetInDoubtXidsWithRestart() throws Exception
+   {
+      Queue queue = new QueueImpl(12);
+      
+      Message message1 = createMessageWithRefs(1, queue);
+      
+      List<Message> msgs = new ArrayList<Message>();
+      
+      msgs.add(message1);
+      
+      Xid xid1 = generateXid();
+      
+      pm.prepareTransaction(xid1, msgs, null);   
+      
+      pm.setInRecoveryMode(true);
+      
+      List<Xid> xids = pm.getInDoubtXids();
+      
+      assertNotNull(xids);
+      
+      assertEquals(1, xids.size());
+      
+      assertEquals(xid1, xids.get(0));
+      
+      
+      
+      Message message2 = createMessageWithRefs(2, queue);
+      
+      msgs.clear();
+      
+      msgs.add(message2);
+      
+      Xid xid2 = generateXid();
+      
+      pm.prepareTransaction(xid2, msgs, null);    
+      
+      xids = pm.getInDoubtXids();
+      
+      assertNotNull(xids);
+      
+      assertEquals(2, xids.size());
+      
+      assertTrue(xids.contains(xid1));
+      
+      assertTrue(xids.contains(xid2));
+      
+      pm.stop();
+      
+      pm.start();
+      
+      pm.setInRecoveryMode(true);
+      
+      xids = pm.getInDoubtXids();
+      
+      assertNotNull(xids);
+      
+      assertEquals(2, xids.size());
+      
+      assertTrue(xids.contains(xid1));
+      
+      assertTrue(xids.contains(xid2));
+      
+      
+      pm.commitPreparedTransaction(xid1);
+      
+      pm.commitPreparedTransaction(xid2);
+      
+      xids = pm.getInDoubtXids();
+      
+      assertNotNull(xids);
+      
+      assertEquals(0, xids.size());            
+   }
+   
+   public void testSetGetRecoveryMode() throws Exception
+   {
+      assertFalse(pm.isInRecoveryMode());
+                 
+      try
+      {
+         pm.getInDoubtXids();
+         fail("Should throw exception");
+      }
+      catch (IllegalStateException e)
+      {
+         //Ok
+      }
+      
+      pm.setInRecoveryMode(true);
+      
+      assertTrue(pm.isInRecoveryMode());
+      
+      pm.getInDoubtXids();
+      
+      pm.setInRecoveryMode(false);
+      
+      assertFalse(pm.isInRecoveryMode());
+   }
+   
+   // Private --------------------------------------------------------------------
+   
+   private Message extractMessage(Map<Long, Queue> queues, long id, byte[] msgBytes, byte[] refBytes) throws Exception
+   {
+      ByteBuffer buffer = ByteBuffer.wrap(msgBytes);
+      
+      int type = buffer.getInt();
+      
+      long expiration = buffer.getLong();
+      
+      long timestamp = buffer.getLong();
+      
+      byte priority = buffer.get();
+      
+      int headerSize = buffer.getInt();
+      
+      byte[] headers = new byte[headerSize];
+      
+      buffer.get(headers);
+      
+      int payloadSize = buffer.getInt();
+      
+      byte[] payload = null;
+      
+      if (payloadSize != 0)
+      {
+         payload = new byte[payloadSize];
+         
+         buffer.get(payload);
+      }
+      
+      Message message = new MessageImpl(id, type, true, expiration, timestamp, priority,
+                                        headers, payload);
+      
+      buffer = ByteBuffer.wrap(refBytes);
+      
+      while (buffer.hasRemaining())
+      {
+         long queueID = buffer.getLong();
+         
+         int deliveryCount = buffer.getInt();
+         
+         long scheduledDeliveryTime = buffer.getLong();
+         
+         MessageReference reference = message.createReference(queues.get(queueID));
+         
+         reference.setDeliveryCount(deliveryCount);
+         
+         reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+      } 
+      
+      return message;
+   }
+   
+   private void assertMessageInStore(Message m, Queue queue) throws Exception
+   {
+      BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
+      
+      BDBJEDatabase refDB = bdb.getDatabase(BDBJEPersistenceManager.REFERENCE_DB_NAME);
+      
+      byte[] msgBytes = msgDB.get(m.getMessageID());
+      
+      assertNotNull(msgBytes);
+      
+      byte[] refBytes = refDB.get(m.getMessageID());
+      
+      assertNotNull(refBytes);
+      
+      Map<Long, Queue> queues = new HashMap<Long, Queue>();
+      
+      queues.put(queue.getID(), queue);
+      
+      Message m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
+       
+      assertEquivalent(m, m2);               
+   }
+   
+   private void assertNumMessagesInStore(int num) throws Exception
+   {
+      BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
+      
+      assertEquals(num, msgDB.size());                    
+   }
+   
+   private void assertMessageNotInStore(Message m) throws Exception
+   {
+      BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
+      
+      BDBJEDatabase refDB = bdb.getDatabase(BDBJEPersistenceManager.REFERENCE_DB_NAME);
+      
+      
+      byte[] msgBytes = msgDB.get(m.getMessageID());
+      
+      assertNull(msgBytes);
+      
+      byte[] refBytes = refDB.get(m.getMessageID());
+      
+      assertNull(refBytes);         
+   }
+   
+   private void assertStoreEmpty() throws Exception
+   {
+      BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
+      
+      BDBJEDatabase refDB = bdb.getDatabase(BDBJEPersistenceManager.REFERENCE_DB_NAME);
+      
+      assertEquals(0, msgDB.size());
+      
+      assertEquals(0, refDB.size());
+   }
+   
+   private Message createMessageWithRefs(long id, Queue queue)
+   {
+      Message m = generateMessage(id);
+      
+      m.createReference(queue);
+      
+      m.createReference(queue);
+      
+      m.createReference(queue);
+      
+      m.createReference(queue);
+      
+      return m;
+   }
+   
+
+   // Inner classes ---------------------------------------------------------------
+        
+}




More information about the jboss-cvs-commits mailing list