[jboss-cvs] JBoss Messaging SVN: r4689 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 17 11:34:33 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-07-17 11:34:33 -0400 (Thu, 17 Jul 2008)
New Revision: 4689

Modified:
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
Log:
Journal work (treating holes on transactions)

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-07-17 14:42:00 UTC (rev 4688)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-07-17 15:34:33 UTC (rev 4689)
@@ -24,6 +24,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -32,7 +33,6 @@
 import org.jboss.messaging.core.asyncio.AIOCallback;
 import org.jboss.messaging.core.asyncio.AsynchronousFile;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.util.VariableLatch;
 
 
 /**
@@ -116,7 +116,6 @@
 	private String fileName;
 	private Thread poller;	
 	private int maxIO;	
-	private VariableLatch writeLatch = new VariableLatch();	
 	private ReadWriteLock lock = new ReentrantReadWriteLock();
 	private Lock writeLock = lock.writeLock();
    private Semaphore writeSemaphore;   
@@ -164,7 +163,10 @@
 		try
 		{
 	      writeLock.lock();
-	      writeLatch.waitCompletion(timeout);
+	      if (!writeSemaphore.tryAcquire(maxIO, timeout, TimeUnit.MILLISECONDS))
+	      {
+	         throw new IllegalStateException("Timeout!");
+	      }
 	      writeSemaphore = null;
 	      stopPoller(handler);
 	      // We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
@@ -184,7 +186,6 @@
 	public void write(final long position, final long size, final ByteBuffer directByteBuffer, final AIOCallback aioPackage)
 	{
 		checkOpened();
-		writeLatch.up();
       writeSemaphore.acquireUninterruptibly();
 		try
 		{
@@ -193,7 +194,6 @@
 		catch (RuntimeException e)
 		{
          writeSemaphore.release();
-	      writeLatch.down();
 			throw e;
 		}
 		
@@ -202,7 +202,6 @@
 	public void read(final long position, final long size, final ByteBuffer directByteBuffer, final AIOCallback aioPackage)
 	{
 		checkOpened();
-		writeLatch.up();
       writeSemaphore.acquireUninterruptibly();
 		try
 		{
@@ -211,7 +210,6 @@
 		catch (RuntimeException e)
 		{
          writeSemaphore.release();
-		   writeLatch.down();
 			throw e;
 		}		
 	}
@@ -242,15 +240,14 @@
 	private void callbackDone(final AIOCallback callback)
 	{
       writeSemaphore.release();
-		writeLatch.down();
 		callback.done();
 	}
 	
 	@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
 	private void callbackError(final AIOCallback callback, final int errorCode, final String errorMessage)
 	{
+	   log.warn("CallbackError: " + errorMessage);
       writeSemaphore.release();
-      writeLatch.down();
 		callback.onError(errorCode, errorMessage);
 	}
 	

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2008-07-17 14:42:00 UTC (rev 4688)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2008-07-17 15:34:33 UTC (rev 4689)
@@ -25,6 +25,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.logging.Logger;
@@ -51,7 +52,7 @@
 	
 	private boolean canReclaim;
 	
-	private Map<JournalFile, Integer> negCounts = new HashMap<JournalFile, Integer>();
+	private Map<JournalFile, Integer> negCounts = new ConcurrentHashMap<JournalFile, Integer>();
 	
 	public JournalFileImpl(final SequentialFile file, final int orderingID)
 	{

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-17 14:42:00 UTC (rev 4688)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-17 15:34:33 UTC (rev 4689)
@@ -82,8 +82,6 @@
    private static final int SIZE_LONG = 8;
    
    private static final int SIZE_INT = 4;
-
-   private static final int SIZE_SHORT = 2;
    
    private static final int SIZE_BYTE = 1;
    
@@ -120,16 +118,15 @@
    
    public static final byte DELETE_RECORD_TX = 16;
    
-   public static final int SIZE_PREPARE_RECORD = BASIC_SIZE + SIZE_LONG;
+   public static final int SIZE_PREPARE_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_INT;
    
    public static final byte PREPARE_RECORD = 17;
    
+   public static final int SIZE_COMMIT_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_INT;
    
-   public static final int SIZE_COMMIT_RECORD = BASIC_SIZE + SIZE_LONG;
-   
    public static final byte COMMIT_RECORD = 18;
    
-   public static final int SIZE_ROLLBACK_RECORD = BASIC_SIZE + SIZE_LONG;
+   public static final int SIZE_ROLLBACK_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_INT;
    
    public static final byte ROLLBACK_RECORD = 19;
    
@@ -170,7 +167,7 @@
    
    private final Map<Long, PosFiles> posFilesMap = new ConcurrentHashMap<Long, PosFiles>();
    
-   private final Map<Long, TransactionNegPos> transactionInfos = new ConcurrentHashMap<Long, TransactionNegPos>();
+   private final Map<Long, JournalTransaction> transactionInfos = new ConcurrentHashMap<Long, JournalTransaction>();
    
    private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
    
@@ -195,12 +192,12 @@
    
    private volatile int state;
    
-   private volatile long lastOrderingID;
-   
    private final AtomicLong transactionIDSequence = new AtomicLong(0);
    
    private Reclaimer reclaimer = new Reclaimer();
    
+   private Thread shutdownHook = null;
+   
    // Static --------------------------------------------------------
    
    private static final Logger log = Logger.getLogger(JournalImpl.class);
@@ -438,13 +435,11 @@
       bb.putInt(size);     
       bb.rewind();
       
-      JournalFile usedFile;
+      JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
       
-      usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+      JournalTransaction tx = getTransactionInfo(txID);
       
-      TransactionNegPos tx = getTransactionInfo(txID);
-      
-      tx.addPos(usedFile, id);
+      tx.addPositive(usedFile, id);
    }
    
    public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
@@ -468,13 +463,11 @@
       bb.putInt(size);
       bb.rewind();
       
-      JournalFile usedFile;
+      JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
       
-      usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+      JournalTransaction tx = getTransactionInfo(txID);
       
-      TransactionNegPos tx = getTransactionInfo(txID);
-      
-      tx.addPos(usedFile, id);
+      tx.addPositive(usedFile, id);
    }
    
    public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
@@ -498,13 +491,11 @@
       bb.putInt(size);     
       bb.rewind();
       
-      JournalFile usedFile;
+      JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
       
-      usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+      JournalTransaction tx = getTransactionInfo(txID);
       
-      TransactionNegPos tx = getTransactionInfo(txID);
-      
-      tx.addPos(usedFile, id);
+      tx.addPositive(usedFile, id);
    }
    
    public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, EncodingSupport record) throws Exception
@@ -529,13 +520,11 @@
       bb.putInt(size);     
       bb.rewind();
       
-      JournalFile usedFile;
+      JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
       
-      usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
+      JournalTransaction tx = getTransactionInfo(txID);
       
-      TransactionNegPos tx = getTransactionInfo(txID);
-      
-      tx.addPos(usedFile, id);
+      tx.addPositive(usedFile, id);
    }
    
    public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
@@ -556,13 +545,11 @@
       bb.putInt(size);     
       bb.rewind();
       
-      JournalFile usedFile;
+      JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
       
-      usedFile = appendRecord(bb, false, getTransactionCallback(txID));
+      JournalTransaction tx = getTransactionInfo(txID);
       
-      TransactionNegPos tx = getTransactionInfo(txID);
-      
-      tx.addNeg(usedFile, id);      
+      tx.addNegative(usedFile, id);      
    }  
    
    public void appendPrepareRecord(final long txID) throws Exception
@@ -572,7 +559,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
       
-      TransactionNegPos tx = transactionInfos.get(txID);
+      JournalTransaction tx = transactionInfos.get(txID);
       
       if (tx == null)
       {
@@ -586,13 +573,12 @@
       bb.put(PREPARE_RECORD);    
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
       bb.putLong(txID);
+      bb.putInt(tx.getNumberOfElements());
       bb.putInt(size);           
       bb.rewind();
       
-      JournalFile usedFile;
+      JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
       
-      usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
-      
       tx.prepare(usedFile);
    }
    
@@ -603,7 +589,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }		
       
-      TransactionNegPos tx = transactionInfos.remove(txID);
+      JournalTransaction tx = transactionInfos.remove(txID);
       
       if (tx == null)
       {
@@ -616,14 +602,13 @@
       
       bb.put(COMMIT_RECORD);     
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
-      bb.putLong(txID);    
+      bb.putLong(txID);
+      bb.putInt(tx.getNumberOfElements());
       bb.putInt(size);           
       bb.rewind();
       
-      JournalFile usedFile;
+      JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
       
-      usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
-      
       transactionCallbacks.remove(txID);
       
       tx.commit(usedFile);
@@ -637,7 +622,7 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
       
-      TransactionNegPos tx = transactionInfos.remove(txID);
+      JournalTransaction tx = transactionInfos.remove(txID);
       
       if (tx == null)
       {
@@ -651,13 +636,12 @@
       bb.put(ROLLBACK_RECORD);      
       bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
       bb.putLong(txID);
+      bb.putInt(tx.getNumberOfElements());
       bb.putInt(size);        
       bb.rewind();
       
-      JournalFile usedFile;
+      JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));      
       
-      usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));      
-      
       transactionCallbacks.remove(txID);
       
       tx.rollback(usedFile);
@@ -671,6 +655,8 @@
          throw new IllegalStateException("Journal must be in started state");
       }
       
+      addShutdownHook();
+      
       Set<Long> recordsToDelete = new HashSet<Long>();
       
       Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
@@ -934,16 +920,16 @@
                   
                   tx.recordInfos.add(new RecordInfo(id, userRecordType, record, false));                     
                   
-                  TransactionNegPos tnp = transactionInfos.get(txID);
+                  JournalTransaction tnp = transactionInfos.get(txID);
                   
                   if (tnp == null)
                   {
-                     tnp = new TransactionNegPos();
+                     tnp = new JournalTransaction();
                      
                      transactionInfos.put(txID, tnp);
                   }
                   
-                  tnp.addPos(file, id);
+                  tnp.addPositive(file, id);
                   
                   hasData = true;                                          
                   
@@ -972,16 +958,16 @@
                   
                   tx.recordInfos.add(new RecordInfo(id, userRecordType, record, true));
                   
-                  TransactionNegPos tnp = transactionInfos.get(txID);
+                  JournalTransaction tnp = transactionInfos.get(txID);
                   
                   if (tnp == null)
                   {
-                     tnp = new TransactionNegPos();
+                     tnp = new JournalTransaction();
                      
                      transactionInfos.put(txID, tnp);
                   }
                   
-                  tnp.addPos(file, id);
+                  tnp.addPositive(file, id);
                   
                   hasData = true;                     
                   
@@ -1004,16 +990,16 @@
                   
                   tx.recordsToDelete.add(id);                     
                   
-                  TransactionNegPos tnp = transactionInfos.get(txID);
+                  JournalTransaction tnp = transactionInfos.get(txID);
                   
                   if (tnp == null)
                   {
-                     tnp = new TransactionNegPos();
+                     tnp = new JournalTransaction();
                      
                      transactionInfos.put(txID, tnp);
                   }
                   
-                  tnp.addNeg(file, id);
+                  tnp.addNegative(file, id);
                   
                   hasData = true;                     
                   
@@ -1021,7 +1007,8 @@
                }  
                case PREPARE_RECORD:
                {
-                  long txID = bb.getLong();           
+                  long txID = bb.getLong();
+                  int numberOfElements = bb.getInt();
                   
                   maxTransactionID = Math.max(maxTransactionID, txID);                 
 
@@ -1034,39 +1021,56 @@
                   
                   tx.prepared = true;
                   
-                  TransactionNegPos tnp = transactionInfos.get(txID);
+                  JournalTransaction journalTransaction = transactionInfos.get(txID);
                   
-                  if (tnp == null)
+                  if (journalTransaction == null)
                   {
                      throw new IllegalStateException("Cannot find tx " + txID);
                   }
+
+                  if (numberOfElements == journalTransaction.getNumberOfElements())
+                  {
+                     journalTransaction.prepare(file);
+                  }
+                  else
+                  {
+                     journalTransaction.setInvalid(true);
+                     tx.invalid = true;
+                  }
                   
-                  tnp.prepare(file);   
-                  
                   hasData = true;         
                   
                   break;
                }
                case COMMIT_RECORD:
                {
-                  long txID = bb.getLong();  
+                  long txID = bb.getLong();
+                  int numberOfElements = bb.getInt();
                   
                   maxTransactionID = Math.max(maxTransactionID, txID);
                   TransactionHolder tx = transactions.remove(txID);
                   
                   if (tx != null)
                   {
-                     records.addAll(tx.recordInfos);                    
-                     recordsToDelete.addAll(tx.recordsToDelete);  
                      
-                     TransactionNegPos tnp = transactionInfos.remove(txID);
+                     JournalTransaction tnp = transactionInfos.remove(txID);
                      
                      if (tnp == null)
                      {
                         throw new IllegalStateException("Cannot find tx " + txID);
                      }
                      
-                     tnp.commit(file);       
+                     if (numberOfElements == tnp.getNumberOfElements())
+                     {
+                        records.addAll(tx.recordInfos);                    
+                        recordsToDelete.addAll(tx.recordsToDelete);  
+                        tnp.commit(file);       
+                     }
+                     else
+                     {
+                        log.warn("Transaction " + txID + " is missing " + (numberOfElements - tnp.getNumberOfElements()) + " so the transaction is being ignored");
+                        tnp.rollback(file);
+                     }
                      
                      hasData = true;         
                   }
@@ -1075,7 +1079,8 @@
                }
                case ROLLBACK_RECORD:
                {
-                  long txID = bb.getLong();     
+                  long txID = bb.getLong();
+                  /* int numberOfElements = */ bb.getInt(); // Not being currently used
                   
                   maxTransactionID = Math.max(maxTransactionID, txID);                 
                   
@@ -1083,7 +1088,7 @@
                   
                   if (tx != null)
                   {                       
-                     TransactionNegPos tnp = transactionInfos.remove(txID);
+                     JournalTransaction tnp = transactionInfos.remove(txID);
                      
                      if (tnp == null)
                      {
@@ -1185,11 +1190,11 @@
       
       for (TransactionHolder transaction: transactions.values())
       {
-         if (!transaction.prepared)
+         if (!transaction.prepared || transaction.invalid)
          {
             log.warn("Uncommitted transaction with id " + transaction.transactionID + " found and discarded");
             
-            TransactionNegPos transactionInfo = this.transactionInfos.get(transaction.transactionID);
+            JournalTransaction transactionInfo = this.transactionInfos.get(transaction.transactionID);
             
             if (transactionInfo == null)
             {
@@ -1224,7 +1229,7 @@
       return this.fileFactory.getAlignment();
    }
    
-   public synchronized void checkReclaimStatus() throws Exception
+   public void checkReclaimStatus() throws Exception
    {
       JournalFile[] files = new JournalFile[dataFiles.size()];
       
@@ -1465,10 +1470,13 @@
       this.closingExecutor = Executors.newSingleThreadExecutor();
       
       state = STATE_STARTED;
+
    }
    
    public synchronized void stop() throws Exception
    {
+      clearShutdownHook();
+
       if (state == STATE_STOPPED)
       {
          throw new IllegalStateException("Journal is already stopped");
@@ -1529,6 +1537,46 @@
    
    // Private -----------------------------------------------------------------------------
    
+   private void clearShutdownHook()
+   {
+      if (shutdownHook != null)
+      {
+         try
+         {
+            Runtime.getRuntime().removeShutdownHook(shutdownHook);
+         }
+         catch (Throwable e)
+         {
+         }
+         shutdownHook = null;
+      }
+   }
+   
+   private void addShutdownHook()
+   {
+      
+      clearShutdownHook();
+      
+      
+      shutdownHook = new Thread()
+      {
+        public void run()
+        {
+           try
+           {
+              log.info("Journal being stopped");
+              JournalImpl.this.stop();
+           }
+           catch (Exception e)
+           {
+              log.warn(e, e);
+           }
+        }
+      };
+      Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+   }
+   
    private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
    {
       lock.acquire();
@@ -1562,17 +1610,6 @@
       }
    }
    
-   private void repairFrom(final int pos, final JournalFile file) throws Exception
-   {
-      log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
-            " in the record that starts at position " + pos + ". " + 
-      "The most likely cause is that a crash occurred in the previous run. The corrupt record will be discarded.");
-      
-      file.getFile().fill(pos, fileSize - pos, FILL_CHARACTER);
-      
-      file.getFile().position(pos);
-   }
-   
    private JournalFile createFile(boolean keepOpened) throws Exception
    {
       int orderingID = generateOrderingID();
@@ -1658,10 +1695,6 @@
             try
             {
                pushOpenedFile();
-               if (autoReclaim)
-               {
-                  checkAndReclaimFiles();
-               }
             }
             catch (Exception e)
             {
@@ -1669,6 +1702,23 @@
             }
          }
       });
+      if (autoReclaim)
+      {
+         openExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+                     checkAndReclaimFiles();
+               }
+               catch (Exception e)
+               {
+                  log.error(e.getMessage(), e);
+               }
+            }
+         });
+      }
       
       JournalFile nextFile = openedFiles.poll(aioTimeout, TimeUnit.SECONDS);
       
@@ -1726,13 +1776,13 @@
       });
    }
    
-   private TransactionNegPos getTransactionInfo(final long txID)
+   private JournalTransaction getTransactionInfo(final long txID)
    {
-      TransactionNegPos tx = transactionInfos.get(txID);
+      JournalTransaction tx = transactionInfos.get(txID);
       
       if (tx == null)
       {
-         tx = new TransactionNegPos();
+         tx = new JournalTransaction();
          
          transactionInfos.put(txID, tx);
       }
@@ -1839,7 +1889,7 @@
       }
    }
    
-   private class TransactionNegPos
+   private class JournalTransaction
    {
       private List<Pair<JournalFile, Long>> pos;
       
@@ -1847,25 +1897,31 @@
       
       private Set<JournalFile> transactionPos;
       
-      void addTXPosCount(final JournalFile file)
+      // Number of elements participating on the transaction
+      // Used to verify completion on reload
+      private final AtomicInteger numberOfElements = new AtomicInteger(0);
+      
+      private boolean invalid = false;
+      
+      public int getNumberOfElements()
       {
-         if (transactionPos == null)
-         {
-            transactionPos = new HashSet<JournalFile>();
-         }
-         
-         if (!transactionPos.contains(file))
-         {
-            transactionPos.add(file);
-            
-            //We add a pos for the transaction itself in the file - this prevents any transactional operations
-            //being deleted before a commit or rollback is written
-            file.incPosCount();
-         }  
+         return numberOfElements.get();
       }
       
-      void addPos(final JournalFile file, final long id)
-      {     
+      public void setInvalid(boolean b)
+      {
+         this.invalid = b;
+      }
+      
+      public boolean isInvalid()
+      {
+         return this.invalid;
+      }
+
+      public void addPositive(final JournalFile file, final long id)
+      {
+         numberOfElements.incrementAndGet();
+
          addTXPosCount(file);          
          
          if (pos == null)
@@ -1876,8 +1932,10 @@
          pos.add(new Pair<JournalFile, Long>(file, id));
       }
       
-      void addNeg(final JournalFile file, final long id)
+      public void addNegative(final JournalFile file, final long id)
       {        
+         numberOfElements.incrementAndGet();
+
          addTXPosCount(file);    
          
          if (neg == null)
@@ -1888,7 +1946,7 @@
          neg.add(new Pair<JournalFile, Long>(file, id));       
       }
       
-      void commit(final JournalFile file)
+      public void commit(final JournalFile file)
       {        
          if (pos != null)
          {
@@ -1932,7 +1990,7 @@
          }        
       }
       
-      void rollback(JournalFile file)
+      public void rollback(JournalFile file)
       {     
          //Now add negs for the pos we added in each file in which there were transactional operations
          //Note that we do this on rollback as we do on commit, since we need to ensure the file containing
@@ -1946,14 +2004,14 @@
          }
       }
       
-      void prepare(JournalFile file)
+      public void prepare(JournalFile file)
       {
          //We don't want the prepare record getting deleted before time
          
          addTXPosCount(file);
       }
       
-      void forget()
+      public void forget()
       {
          //The transaction was not committed or rolled back in the file, so we reverse any pos counts we added
          
@@ -1962,6 +2020,25 @@
             jf.decPosCount();
          }
       }
+      
+      private void addTXPosCount(final JournalFile file)
+      {
+         if (transactionPos == null)
+         {
+            transactionPos = new HashSet<JournalFile>();
+         }
+         
+         if (!transactionPos.contains(file))
+         {
+            transactionPos.add(file);
+            
+            //We add a pos for the transaction itself in the file - this prevents any transactional operations
+            //being deleted before a commit or rollback is written
+            file.incPosCount();
+         }  
+      }
+      
+      
    }
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java	2008-07-17 14:42:00 UTC (rev 4688)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java	2008-07-17 15:34:33 UTC (rev 4689)
@@ -34,6 +34,7 @@
  * A TransactionHolder
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
 
@@ -52,4 +53,6 @@
 	
 	public boolean prepared;
 	
+	public boolean invalid;
+	
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java	2008-07-17 14:42:00 UTC (rev 4688)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java	2008-07-17 15:34:33 UTC (rev 4689)
@@ -141,9 +141,16 @@
             /* ID */15l, 
             JournalImpl.SIZE_DELETE_RECORD_TX)), EasyMock.eq(false))).andReturn(JournalImpl.SIZE_DELETE_RECORD_TX);
       
+      EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.PREPARE_RECORD, 
+            /*FileID*/1, 
+            /* Transaction ID*/ 100l,
+            /* Number of Elements */ 1,
+            JournalImpl.SIZE_PREPARE_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_PREPARE_RECORD);
+      
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.COMMIT_RECORD, 
             /*FileID*/1, 
             /* Transaction ID*/ 100l,
+            /* Number of Elements */ 1,
             JournalImpl.SIZE_COMMIT_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_COMMIT_RECORD);
       
       EasyMock.replay(mockFactory, file1, file2);
@@ -152,6 +159,8 @@
       
       journalImpl.appendDeleteRecordTransactional(100l, 15l);
       
+      journalImpl.appendPrepareRecord(100l);
+      
       journalImpl.appendCommitRecord(100l);
       
       EasyMock.verify(mockFactory, file1, file2);
@@ -190,11 +199,13 @@
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.PREPARE_RECORD, 
             /*FileID*/1, 
             /* TXID */ 3l,
+            /* Number Of Elements */ 2,
             JournalImpl.SIZE_PREPARE_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_PREPARE_RECORD);
       
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.COMMIT_RECORD, 
             /*FileID*/1, 
             /* TXID */ 3l,
+            /* Number Of Elements */ 2,
             JournalImpl.SIZE_COMMIT_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_COMMIT_RECORD);
 
       EasyMock.replay(mockFactory, file1, file2);
@@ -228,6 +239,7 @@
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ROLLBACK_RECORD, 
             /*FileID*/1, 
             /* TXID */ 3l,
+            /* NumberOfElements */ 1,
             JournalImpl.SIZE_ROLLBACK_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ROLLBACK_RECORD);
 
       EasyMock.replay(mockFactory, file1, file2);
@@ -313,6 +325,7 @@
       EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.COMMIT_RECORD, 
             /*FileID*/1, 
             /* Transaction ID*/ 33l,
+            /* NumberOfElements*/ 2,
             JournalImpl.SIZE_COMMIT_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_COMMIT_RECORD);
       
       EasyMock.replay(mockFactory, file1, file2);
@@ -446,6 +459,7 @@
 
          public void appendTo(StringBuffer buffer)
          {
+            buffer.append("ByteArray");
          }
 
          public boolean matches(Object argument)




More information about the jboss-cvs-commits mailing list