[jboss-cvs] JBoss Messaging SVN: r7446 - in branches/clebert_temp_expirement: src/main/org/jboss/messaging/core/journal/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 23 18:48:10 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-23 18:48:09 -0400 (Tue, 23 Jun 2009)
New Revision: 7446

Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/util/JournalExample.java
Log:
Individualizing callbacks (per file)

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java	2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java	2009-06-23 22:48:09 UTC (rev 7446)
@@ -50,19 +50,19 @@
 
    // Transactional operations
 
-   void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record, boolean sync) throws Exception;
+   void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
 
-   void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+   void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
 
-   void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record, boolean sync) throws Exception;
+   void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
 
-   void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+   void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
 
-   void appendDeleteRecordTransactional(long txID, long id, byte[] record, boolean sync) throws Exception;
+   void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception;
 
-   void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record, boolean sync) throws Exception;
+   void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception;
 
-   void appendDeleteRecordTransactional(long txID, long id, boolean sync) throws Exception;
+   void appendDeleteRecordTransactional(long txID, long id) throws Exception;
 
    void appendCommitRecord(long txID, boolean sync) throws Exception;
 

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-23 22:48:09 UTC (rev 7446)
@@ -305,7 +305,7 @@
          lock.acquire();
          try
          {
-            JournalFile usedFile = appendRecord(bb, sync, callback);
+            JournalFile usedFile = appendRecord(bb, sync, null, callback);
 
             recordsRelationshipMap.put(id, new RecordFilesRelationship(usedFile));
          }
@@ -368,7 +368,7 @@
          lock.acquire();
          try
          {
-            JournalFile usedFile = appendRecord(bb, sync, callback);
+            JournalFile usedFile = appendRecord(bb, sync, null, callback);
 
             posFiles.addUpdateFile(usedFile);
          }
@@ -423,7 +423,7 @@
          lock.acquire();
          try
          {
-            JournalFile usedFile = appendRecord(bb, sync, callback);
+            JournalFile usedFile = appendRecord(bb, sync, null, callback);
 
             posFiles.addDelete(usedFile);
          }
@@ -446,18 +446,16 @@
    public void appendAddRecordTransactional(final long txID,
                                             final long id,
                                             final byte recordType,
-                                            final byte[] record,
-                                            final boolean sync) throws Exception
+                                            final byte[] record) throws Exception
    {
-      appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record), sync);
+      appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
 
    }
 
    public void appendAddRecordTransactional(final long txID,
                                             final long id,
                                             final byte recordType,
-                                            final EncodingSupport record,
-                                            final boolean sync) throws Exception
+                                            final EncodingSupport record) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -489,7 +487,7 @@
          {
             JournalTransaction tx = getTransactionInfo(txID);
 
-            JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+            JournalFile usedFile = appendRecord(bb, false, tx, null);
 
             tx.addPositive(usedFile, id);
          }
@@ -507,17 +505,15 @@
    public void appendUpdateRecordTransactional(final long txID,
                                                final long id,
                                                final byte recordType,
-                                               final byte[] record,
-                                               final boolean sync) throws Exception
+                                               final byte[] record) throws Exception
    {
-      appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record), sync);
+      appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
    }
 
    public void appendUpdateRecordTransactional(final long txID,
                                                final long id,
                                                final byte recordType,
-                                               final EncodingSupport record,
-                                               final boolean sync) throws Exception
+                                               final EncodingSupport record) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -547,7 +543,7 @@
          {
             JournalTransaction tx = getTransactionInfo(txID);
 
-            JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+            JournalFile usedFile = appendRecord(bb, false, tx, null);
 
             tx.addPositive(usedFile, id);
          }
@@ -562,15 +558,14 @@
       }
    }
 
-   public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record, final boolean sync) throws Exception
+   public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
    {
-      appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record), sync);
+      appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
    }
 
    public void appendDeleteRecordTransactional(final long txID,
                                                final long id,
-                                               final EncodingSupport record,
-                                               final boolean sync) throws Exception
+                                               final EncodingSupport record) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -601,7 +596,7 @@
          {
             JournalTransaction tx = getTransactionInfo(txID);
 
-            JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+            JournalFile usedFile = appendRecord(bb, false, tx, null);
 
             tx.addNegative(usedFile, id);
          }
@@ -616,7 +611,7 @@
       }
    }
 
-   public void appendDeleteRecordTransactional(final long txID, final long id, final boolean sync) throws Exception
+   public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -643,7 +638,7 @@
          {
             JournalTransaction tx = getTransactionInfo(txID);
 
-            JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+            JournalFile usedFile = appendRecord(bb, false, tx, null);
 
             tx.addNegative(usedFile, id);
          }
@@ -679,22 +674,24 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
+      JournalTransaction tx = getTransactionInfo(txID);
+
+      if (sync)
+      {
+         tx.syncPreviousFiles();
+      }
+
       readLockCompact.lock();
 
-      IOCallback callback = null;
-
       try
       {
-         JournalTransaction tx = getTransactionInfo(txID);
 
          ChannelBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
 
-         callback = getTransactionCallback(tx, sync);
-
          lock.acquire();
          try
          {
-            JournalFile usedFile = appendRecord(bb, sync, callback);
+            JournalFile usedFile = appendRecord(bb, sync, tx, null);
 
             tx.prepare(usedFile);
          }
@@ -710,11 +707,7 @@
       }
 
       // We should wait this outside of the lock, to increase throughput
-      if (callback != null)
-      {
-         callback.waitCompletion();
-      }
-
+      tx.waitCompletion();
    }
 
    /**
@@ -741,15 +734,18 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
+      JournalTransaction tx = transactionInfos.remove(txID);
+
+      if (sync)
+      {
+         tx.syncPreviousFiles();
+      }
+
       readLockCompact.lock();
 
-      IOCallback callback = null;
-
       try
       {
 
-         JournalTransaction tx = transactionInfos.remove(txID);
-
          if (tx == null)
          {
             throw new IllegalStateException("Cannot find tx with id " + txID);
@@ -757,12 +753,10 @@
 
          ChannelBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
 
-         callback = getTransactionCallback(tx, sync);
-
          lock.acquire();
          try
          {
-            JournalFile usedFile = appendRecord(bb, sync, callback);
+            JournalFile usedFile = appendRecord(bb, sync, tx, null);
 
             tx.commit(usedFile);
          }
@@ -777,10 +771,10 @@
          readLockCompact.unlock();
       }
 
-      // We should wait this outside of the lock, to increase throuput
-      if (callback != null)
+      if (sync)
       {
-         callback.waitCompletion();
+         // We should wait this outside of the lock, to increase throuput
+         tx.waitCompletion();
       }
    }
 
@@ -791,13 +785,13 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      IOCallback callback = null;
-
       readLockCompact.lock();
 
+      JournalTransaction tx = null;
+
       try
       {
-         JournalTransaction tx = transactionInfos.remove(txID);
+         tx = transactionInfos.remove(txID);
 
          if (tx == null)
          {
@@ -813,12 +807,10 @@
          bb.writeLong(txID);
          bb.writeInt(size);
 
-         callback = getTransactionCallback(tx, sync);
-
          lock.acquire();
          try
          {
-            JournalFile usedFile = appendRecord(bb, sync, callback);
+            JournalFile usedFile = appendRecord(bb, sync, tx, null);
 
             tx.rollback(usedFile);
          }
@@ -834,9 +826,10 @@
       }
 
       // We should wait this outside of the lock, to increase throuput
-      if (callback != null)
+
+      if (sync)
       {
-         callback.waitCompletion();
+         tx.waitCompletion();
       }
 
    }
@@ -934,7 +927,7 @@
       }
 
       final ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot = recordsSnapshotList;
-      
+
       Compactor compactor = new Compactor(recordsSnapshot, dataFilesToProcess.get(0).getFileID());
 
       for (final JournalFile file : dataFilesToProcess)
@@ -960,11 +953,11 @@
    class Compactor implements JournalReader
    {
       JournalFile currentOutputFile;
-      
+
       SequentialFile sequentialFile;
 
       ByteBuffer bufferWrite;
-      
+
       int nextOrderingID;
 
       ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot;
@@ -1001,7 +994,7 @@
             sequentialFile.write(bufferWrite, true);
             sequentialFile.close();
          }
-         
+
          bufferWrite = fileFactory.newBuffer(fileSize);
          currentOutputFile = openFile(false);
          sequentialFile = currentOutputFile.getFile();
@@ -1578,10 +1571,7 @@
 
       for (JournalTransaction tx : transactionInfos.values())
       {
-         if (tx.getCallback() != null)
-         {
-            tx.getCallback().waitCompletion();
-         }
+         tx.waitCallbacks();
       }
 
       if (filesExecutor != null && !filesExecutor.isShutdown())
@@ -2349,7 +2339,7 @@
          file.read(bb);
 
          int fileID = bb.getInt();
-         
+
          int orderingID = bb.getInt();
 
          fileFactory.releaseBuffer(bb);
@@ -2375,7 +2365,7 @@
          {
             int oid1 = f1.getOrderingID();
             int oid2 = f2.getOrderingID();
-            
+
             if (oid1 == oid2)
             {
                int id1 = f1.getFileID();
@@ -2397,7 +2387,10 @@
    /** 
     * Note: You should aways guarantee locking the semaphore lock.
     * */
-   private JournalFile appendRecord(final MessagingBuffer bb, final boolean sync, final IOCallback callback) throws Exception
+   private JournalFile appendRecord(final MessagingBuffer bb,
+                                    final boolean sync,
+                                    final JournalTransaction tx,
+                                    IOCallback callback) throws Exception
    {
       try
       {
@@ -2433,9 +2426,34 @@
 
          if (currentFile == null)
          {
-            throw new IllegalStateException("Current file = null");
+            throw new NullPointerException("Current file = null");
          }
 
+         if (tx != null)
+         {
+            if (callback != null)
+            {
+               // sanity check, it should not happen.
+               throw new IllegalArgumentException("Invalid callback parameter. Use of tx is mutually exclusive with the callback");
+            }
+
+            // The callback of a transaction has to be taken inside the lock,
+            // when we guarantee the currentFile will not be changed,
+            // since we individualize the callback per file
+            callback = tx.getCallback(currentFile);
+            
+            if (sync)
+            {
+               // We already did sync previous files outside of the lock,
+               // but in a very rare occasion (maybe in a low speed disk)
+               // you could have a race where the currentFile changed between the last sync to the time the lock was acquired.
+               // So, we call the syncPreviousFiles again to guarantee data on disk.
+               // Even if there is data to be synced, this should be very fast since previous files were already scheduled to be closed.
+               // This is just verifying if previous files are already closed
+               tx.syncPreviousFiles();
+            }
+         }
+
          bb.writerIndex(SIZE_BYTE);
 
          bb.writeInt(currentFile.getFileID());
@@ -2691,34 +2709,6 @@
       }
    }
 
-   private IOCallback getTransactionCallback(final JournalTransaction tx, final boolean sync) throws MessagingException
-   {
-      if (sync && fileFactory.isSupportsCallbacks())
-      {
-         TransactionCallback callback = tx.getCallback();
-
-         if (callback == null)
-         {
-            callback = new TransactionCallback();
-
-            tx.setCallback(callback);
-         }
-
-         if (callback.errorMessage != null)
-         {
-            throw new MessagingException(callback.errorCode, callback.errorMessage);
-         }
-
-         callback.countUp();
-
-         return callback;
-      }
-      else
-      {
-         return null;
-      }
-   }
-
    public ChannelBuffer newBuffer(final int size)
    {
       return ChannelBuffers.buffer(size);
@@ -2834,8 +2824,6 @@
 
    private class JournalTransaction
    {
-      private TransactionCallback callback;
-
       private List<Pair<JournalFile, Long>> pos;
 
       private List<Pair<JournalFile, Long>> neg;
@@ -2844,30 +2832,87 @@
       // We can't have those files being reclaimed or compacted if there is a pending transaction
       private Set<JournalFile> pendingFiles;
 
+      private TransactionCallback currentCallback;
+
       // Map of file id to number of elements participating on the transaction
       // in that file
       // Used to verify completion on reload
       private final Map<Integer, AtomicInteger> numberOfElementsPerFile = new HashMap<Integer, AtomicInteger>();
 
+      private Map<JournalFile, TransactionCallback> callbackList;
+
       public Map<Integer, AtomicInteger> getElementsSummary()
       {
          return numberOfElementsPerFile;
       }
 
-      /**
-       * @param callback
-       */
-      public void setCallback(TransactionCallback callback)
+      /** 99.99 % of the times previous files will be already synced, since they are scheduled to be closed.
+       *  Because of that, this operation should be almost very fast.*/
+      public void syncPreviousFiles() throws Exception
       {
-         this.callback = callback;
+         if (fileFactory.isSupportsCallbacks())
+         {
+            if (callbackList != null)
+            {
+               for (Map.Entry<JournalFile, TransactionCallback> entry : callbackList.entrySet())
+               {
+                  if (entry.getKey() != currentFile)
+                  {
+                     entry.getValue().waitCompletion();
+                  }
+               }
+            }
+         }
+         else
+         {
+            for (JournalFile file: pendingFiles)
+            {
+               if (file != currentFile)
+               {
+                   file.getFile().waitForClose();
+               }
+            }
+         }
       }
 
+      public TransactionCallback getCurrentCallback()
+      {
+         return currentCallback;
+      }
+
       /**
        * @return
        */
-      public TransactionCallback getCallback()
+      public TransactionCallback getCallback(JournalFile file) throws Exception
       {
-         return this.callback;
+         if (fileFactory.isSupportsCallbacks())
+         {
+            if (callbackList == null)
+            {
+               callbackList = new HashMap<JournalFile, TransactionCallback>();
+            }
+
+            currentCallback = callbackList.get(file);
+
+            if (currentCallback == null)
+            {
+               currentCallback = new TransactionCallback();
+               callbackList.put(file, currentCallback);
+            }
+
+            if (currentCallback.errorMessage != null)
+            {
+               throw new MessagingException(currentCallback.errorCode, currentCallback.errorMessage);
+            }
+
+            currentCallback.countUp();
+
+            return currentCallback;
+         }
+         else
+         {
+            return null;
+         }
       }
 
       public void addPositive(final JournalFile file, final long id)
@@ -2945,6 +2990,26 @@
          }
       }
 
+      public void waitCallbacks() throws Exception
+      {
+         if (callbackList != null)
+         {
+            for (TransactionCallback callback : callbackList.values())
+            {
+               callback.waitCompletion();
+            }
+         }
+      }
+
+      /** Wait completion at the latest file only */
+      public void waitCompletion() throws Exception
+      {
+         if (currentCallback != null)
+         {
+            currentCallback.waitCompletion();
+         }
+      }
+
       /** 
        * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
        * or else potFilesMap could be affected
@@ -3096,7 +3161,7 @@
 
             for (int i = 0; i < pages; i++)
             {
-               appendRecord(bb, false, null);
+               appendRecord(bb, false, null, null);
             }
 
             lock.release();

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-06-23 22:48:09 UTC (rev 7446)
@@ -340,16 +340,14 @@
          messageJournal.appendAddRecordTransactional(txID,
                                                      message.getMessageID(),
                                                      ADD_LARGE_MESSAGE,
-                                                     new LargeMessageEncoding(((LargeServerMessage)message)),
-                                                     syncTransactional);
+                                                     new LargeMessageEncoding(((LargeServerMessage)message)));
       }
       else
       {
          messageJournal.appendAddRecordTransactional(txID,
                                                      message.getMessageID(),
                                                      ADD_MESSAGE,
-                                                     message,
-                                                     syncTransactional);
+                                                     message);
       }
 
    }
@@ -360,7 +358,7 @@
       {
          // Instead of updating the record, we delete the old one as that is
          // better for reclaiming
-         messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID(), syncTransactional);
+         messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID());
       }
 
       pageTransaction.setRecordID(generateUniqueID());
@@ -368,8 +366,7 @@
       messageJournal.appendAddRecordTransactional(txID,
                                                   pageTransaction.getRecordID(),
                                                   PAGE_TRANSACTION,
-                                                  pageTransaction,
-                                                  syncTransactional);
+                                                  pageTransaction);
    }
 
    public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
@@ -377,8 +374,7 @@
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      messageID,
                                                      ADD_REF,
-                                                     new RefEncoding(queueID),
-                                                     syncTransactional);
+                                                     new RefEncoding(queueID));
    }
 
    public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
@@ -386,13 +382,12 @@
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      messageID,
                                                      ACKNOWLEDGE_REF,
-                                                     new RefEncoding(queueID),
-                                                     syncTransactional);
+                                                     new RefEncoding(queueID));
    }
 
    public void deletePageTransactional(final long txID, final long recordID) throws Exception
    {
-      messageJournal.appendDeleteRecordTransactional(txID, recordID, syncTransactional);
+      messageJournal.appendDeleteRecordTransactional(txID, recordID);
    }
 
    public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
@@ -403,13 +398,12 @@
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      ref.getMessage().getMessageID(),
                                                      SET_SCHEDULED_DELIVERY_TIME,
-                                                     encoding,
-                                                     syncTransactional);
+                                                     encoding);
    }
 
    public void deleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
-      messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID), syncTransactional);
+      messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
    }
 
    public void prepare(final long txID, final Xid xid) throws Exception
@@ -434,7 +428,7 @@
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
-      messageJournal.appendAddRecordTransactional(txID, recordID, DUPLICATE_ID, encoding, syncTransactional);
+      messageJournal.appendAddRecordTransactional(txID, recordID, DUPLICATE_ID, encoding);
    }
 
    public void updateDuplicateIDTransactional(final long txID,
@@ -444,12 +438,12 @@
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
-      messageJournal.appendUpdateRecordTransactional(txID, recordID, DUPLICATE_ID, encoding, syncTransactional);
+      messageJournal.appendUpdateRecordTransactional(txID, recordID, DUPLICATE_ID, encoding);
    }
 
    public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception
    {
-      messageJournal.appendDeleteRecordTransactional(txID, recordID, syncTransactional);
+      messageJournal.appendDeleteRecordTransactional(txID, recordID);
    }
 
    // Other operations

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java	2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java	2009-06-23 22:48:09 UTC (rev 7446)
@@ -221,7 +221,7 @@
             long startTrans = System.currentTimeMillis();
             for (int j = 0; j < 1000; j++)
             {
-               journal.appendAddRecordTransactional(i, count++, (byte)0, data, true);
+               journal.appendAddRecordTransactional(i, count++, (byte)0, data);
             }
 
             journal.appendCommitRecord(i, true);

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java	2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java	2009-06-23 22:48:09 UTC (rev 7446)
@@ -215,7 +215,7 @@
                
                if (transactionSize != 0)
                {
-                  journal.appendAddRecordTransactional(transactionId, id, (byte)99, buffer.array(), false);
+                  journal.appendAddRecordTransactional(transactionId, id, (byte)99, buffer.array());
         
                   if (++transactionCounter == transactionSize)
                   {

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-06-23 22:48:09 UTC (rev 7446)
@@ -390,7 +390,7 @@
       assertEquals(0, records.size());
       assertEquals(0, transactions.size());
 
-      journalImpl.appendAddRecordTransactional(1, 1, (byte)1, new SimpleEncoding(1, (byte)1), false);
+      journalImpl.appendAddRecordTransactional(1, 1, (byte)1, new SimpleEncoding(1, (byte)1));
 
       setupJournal(JOURNAL_SIZE, 100);
 
@@ -429,7 +429,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(77l, 1, (byte)1, new SimpleEncoding(1, (byte)1), false);
+         journalImpl.appendAddRecordTransactional(77l, 1, (byte)1, new SimpleEncoding(1, (byte)1));
          journalImpl.forceMoveNextFile();
       }
 
@@ -437,7 +437,7 @@
 
       assertEquals(12, factory.listFiles("tt").size());
 
-      journalImpl.appendAddRecordTransactional(78l, 1, (byte)1, new SimpleEncoding(1, (byte)1), false);
+      journalImpl.appendAddRecordTransactional(78l, 1, (byte)1, new SimpleEncoding(1, (byte)1));
 
       assertEquals(12, factory.listFiles("tt").size());
 
@@ -478,7 +478,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(1, (byte)1), false);
+         journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(1, (byte)1));
          journalImpl.forceMoveNextFile();
       }
 
@@ -501,7 +501,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendDeleteRecordTransactional(2l, i, false);
+         journalImpl.appendDeleteRecordTransactional(2l, i);
          journalImpl.forceMoveNextFile();
       }
 
@@ -536,7 +536,7 @@
       journalImpl.appendAddRecordTransactional(1l,
                                                2l,
                                                (byte)3,
-                                               new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4), false);
+                                               new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4));
 
       journalImpl.appendCommitRecord(1l, false);
 
@@ -561,7 +561,7 @@
 
       for (int i = 0; i < 20; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
          journalImpl.forceMoveNextFile();
       }
 
@@ -626,8 +626,8 @@
 
       for (int i = 0; i < 20; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
-         journalImpl.appendAddRecordTransactional(2l, i + 20l, (byte)0, new SimpleEncoding(1, (byte)15), false);
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
+         journalImpl.appendAddRecordTransactional(2l, i + 20l, (byte)0, new SimpleEncoding(1, (byte)15));
          journalImpl.forceMoveNextFile();
       }
 
@@ -734,13 +734,13 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
          journalImpl.forceMoveNextFile();
       }
 
       for (int i = 10; i < 20; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
          journalImpl.forceMoveNextFile();
       }
 
@@ -796,7 +796,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
       }
 
       journalImpl.forceMoveNextFile();
@@ -807,7 +807,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendDeleteRecordTransactional(2l, i, false);
+         journalImpl.appendDeleteRecordTransactional(2l, i);
       }
 
       journalImpl.appendCommitRecord(2l, false);
@@ -841,7 +841,7 @@
          {
             journalImpl.forceMoveNextFile();
          }
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
       }
 
       journalImpl.appendCommitRecord(1l, false);
@@ -852,7 +852,7 @@
          {
             journalImpl.forceMoveNextFile();
          }
-         journalImpl.appendDeleteRecordTransactional(2l, i, false);
+         journalImpl.appendDeleteRecordTransactional(2l, i);
       }
 
       journalImpl.appendCommitRecord(2l, false);
@@ -878,7 +878,7 @@
 
       journalImpl.appendAddRecord(10l, (byte)0, new SimpleEncoding(10, (byte)0), false);
 
-      journalImpl.appendDeleteRecordTransactional(1l, 10l, new SimpleEncoding(100, (byte)'j'), false);
+      journalImpl.appendDeleteRecordTransactional(1l, 10l, new SimpleEncoding(100, (byte)'j'));
 
       journalImpl.appendPrepareRecord(1, xid, false);
 
@@ -929,7 +929,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1), false);
+         journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1));
          journalImpl.forceMoveNextFile();
       }
 
@@ -968,7 +968,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendDeleteRecordTransactional(2l, i, false);
+         journalImpl.appendDeleteRecordTransactional(2l, i);
       }
 
       SimpleEncoding xid2 = new SimpleEncoding(15, (byte)2);
@@ -1018,7 +1018,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1), false);
+         journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1));
          journalImpl.forceMoveNextFile();
       }
 
@@ -1067,7 +1067,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0), false);
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0));
          journalImpl.forceMoveNextFile();
       }
 
@@ -1096,7 +1096,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0), false);
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0));
       }
 
       journalImpl.appendCommitRecord(1l, false);
@@ -1119,7 +1119,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0), false);
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0));
       }
 
       journalImpl.appendCommitRecord(1l, false);
@@ -1203,7 +1203,7 @@
                latchStart.await();
                for (int i = 0; i < NUMBER_OF_ELEMENTS; i++)
                {
-                  journalImpl.appendAddRecordTransactional(i, i, (byte)1, new SimpleEncoding(50, (byte)1), false);
+                  journalImpl.appendAddRecordTransactional(i, i, (byte)1, new SimpleEncoding(50, (byte)1));
                   journalImpl.appendCommitRecord(i, false);
                   queueDelete.offer(i);
                }

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java	2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java	2009-06-23 22:48:09 UTC (rev 7446)
@@ -85,7 +85,7 @@
             {
                for (int i = 0; i < 10; i++)
                {
-                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), true);
+                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
                }
 
                latch.countDown();
@@ -147,7 +147,7 @@
             {
                for (int i = 0; i < 10; i++)
                {
-                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), true);
+                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
                }
 
                journalImpl.appendRollbackRecord(1l, true);
@@ -211,7 +211,7 @@
             {
                for (int i = 0; i < 10; i++)
                {
-                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), true);
+                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
                }
 
                journalImpl.appendCommitRecord(1l, true);
@@ -268,7 +268,7 @@
       factory.setHoldCallbacks(true, null);
       factory.setGenerateErrors(true);
 
-      journalImpl.appendAddRecordTransactional(1l, 1, (byte)1, new SimpleEncoding(1, (byte)0), true);
+      journalImpl.appendAddRecordTransactional(1l, 1, (byte)1, new SimpleEncoding(1, (byte)0));
 
       factory.flushAllCallbacks();
 
@@ -277,7 +277,7 @@
 
       try
       {
-         journalImpl.appendAddRecordTransactional(1l, 2, (byte)1, new SimpleEncoding(1, (byte)0), true);
+         journalImpl.appendAddRecordTransactional(1l, 2, (byte)1, new SimpleEncoding(1, (byte)0));
          fail("Exception expected"); // An exception already happened in one
          // of the elements on this transaction.
          // We can't accept any more elements on

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-06-23 22:48:09 UTC (rev 7446)
@@ -270,7 +270,7 @@
          // SIZE_BYTE
          byte[] record = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
 
-         journal.appendAddRecordTransactional(txID, element, (byte)0, record, sync);
+         journal.appendAddRecordTransactional(txID, element, (byte)0, record);
 
          tx.records.add(new RecordInfo(element, (byte)0, record, false));
 
@@ -287,7 +287,7 @@
       {
          byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_UPDATE_RECORD_TX);
 
-         journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord, sync);
+         journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord);
 
          tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true));
       }
@@ -300,7 +300,7 @@
 
       for (long element : arguments)
       {
-         journal.appendDeleteRecordTransactional(txID, element, sync);
+         journal.appendDeleteRecordTransactional(txID, element);
 
          tx.deletes.add(new RecordInfo(element, (byte)0, null, true));
       }

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/util/JournalExample.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/util/JournalExample.java	2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/util/JournalExample.java	2009-06-23 22:48:09 UTC (rev 7446)
@@ -147,7 +147,7 @@
                                                                                                            0,
                                                                                                            1,
                                                                                                            2,
-                                                                                                           5 }, true);
+                                                                                                           5 });
          }
 
          // After this is complete, you're sure the records are there




More information about the jboss-cvs-commits mailing list