[hornetq-commits] JBoss hornetq SVN: r8154 - in branches/Clebert_Sync: src/main/org/hornetq/core/persistence/impl/journal and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 27 21:01:51 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-27 21:01:50 -0400 (Tue, 27 Oct 2009)
New Revision: 8154

Added:
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalLock.java
Modified:
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Backup changes

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-27 23:48:09 UTC (rev 8153)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-28 01:01:50 UTC (rev 8154)
@@ -69,7 +69,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class JournalImpl implements TestableJournal
+public class JournalImpl implements TestableJournal, JournalLock
 {
 
    // Constants -----------------------------------------------------
@@ -2736,7 +2736,44 @@
          lockAppend.unlock();
       }
    }
+   
+   // JournalLock Interface ------------------------------------------------------
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalLock#readLock()
+    */
+   public void readLock()
+   {
+      globalLock.readLock().lock();
+   }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalLock#readUnlock()
+    */
+   public void readUnlock()
+   {
+      globalLock.readLock().unlock();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalLock#writeLock()
+    */
+   public void writeLock()
+   {
+      globalLock.writeLock().lock();
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.impl.JournalLock#writeUnLock()
+    */
+   public void writeUnLock()
+   {
+      globalLock.writeLock().unlock();
+   }
+
+
+
    // Public
    // -----------------------------------------------------------------------------
 

Added: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalLock.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalLock.java	                        (rev 0)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalLock.java	2009-10-28 01:01:50 UTC (rev 8154)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+/**
+ * This interface is used to share the lock operations done at the Journal level.
+ * 
+ * Some Journal delegates may need to make sure the global locks are shared with the caller.
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface JournalLock
+{
+   void readLock();
+   
+   void readUnlock();
+   
+   void writeLock();
+   
+   void writeUnLock();
+}

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-27 23:48:09 UTC (rev 8153)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-28 01:01:50 UTC (rev 8154)
@@ -195,7 +195,7 @@
 
       if (replicator != null)
       {
-         this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings, replicator);
+         this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings, localBindings, replicator);
       }
       else
       {
@@ -263,7 +263,7 @@
 
       if (replicator != null)
       {
-         this.messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
+         this.messageJournal = new ReplicatedJournal((byte)1, localMessage, localMessage, replicator);
       }
       else
       {

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-10-27 23:48:09 UTC (rev 8153)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-10-28 01:01:50 UTC (rev 8154)
@@ -21,6 +21,7 @@
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.JournalLock;
 import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
@@ -49,14 +50,20 @@
 
    private final Journal localJournal;
 
+   private final JournalLock journalLock;
+
    private final byte journalID;
 
-   public ReplicatedJournal(final byte journaID, final Journal localJournal, final ReplicationManager replicationManager)
+   public ReplicatedJournal(final byte journaID,
+                            final JournalLock journalLock,
+                            final Journal localJournal,
+                            final ReplicationManager replicationManager)
    {
       super();
       journalID = journaID;
       this.localJournal = localJournal;
       this.replicationManager = replicationManager;
+      this.journalLock = journalLock;
    }
 
    public ReplicatedJournal(final byte journaID, final ReplicationManager replicationManager)
@@ -64,6 +71,7 @@
       super();
       journalID = journaID;
       localJournal = null;
+      journalLock = null;
       this.replicationManager = replicationManager;
    }
 
@@ -104,11 +112,19 @@
       {
          trace("Append record id = " + id + " recordType = " + recordType);
       }
-      replicationManager.appendAddRecord(journalID, id, recordType, record);
-      if (localJournal != null)
+      preAppend();
+      try
       {
-         localJournal.appendAddRecord(id, recordType, record, sync);
+         replicationManager.appendAddRecord(journalID, id, recordType, record);
+         if (localJournal != null)
+         {
+            localJournal.appendAddRecord(id, recordType, record, sync);
+         }
       }
+      finally
+      {
+         afterAppend();
+      }
    }
 
    /**
@@ -141,11 +157,20 @@
       {
          trace("Append record TXid = " + id + " recordType = " + recordType);
       }
-      replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
-      if (localJournal != null)
+      preAppend();
+      try
       {
-         localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+         replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
+         if (localJournal != null)
+         {
+            localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+         }
       }
+      finally
+      {
+         afterAppend();
+      }
+
    }
 
    /**
@@ -160,11 +185,20 @@
       {
          trace("AppendCommit " + txID);
       }
-      replicationManager.appendCommitRecord(journalID, txID);
-      if (localJournal != null)
+      preAppend();
+      try
       {
-         localJournal.appendCommitRecord(txID, sync);
+         replicationManager.appendCommitRecord(journalID, txID);
+         if (localJournal != null)
+         {
+            localJournal.appendCommitRecord(txID, sync);
+         }
       }
+      finally
+      {
+         afterAppend();
+      }
+
    }
 
    /**
@@ -179,11 +213,20 @@
       {
          trace("AppendDelete " + id);
       }
-      replicationManager.appendDeleteRecord(journalID, id);
-      if (localJournal != null)
+      preAppend();
+      try
       {
-         localJournal.appendDeleteRecord(id, sync);
+         replicationManager.appendDeleteRecord(journalID, id);
+         if (localJournal != null)
+         {
+            localJournal.appendDeleteRecord(id, sync);
+         }
       }
+      finally
+      {
+         afterAppend();
+      }
+
    }
 
    /**
@@ -211,11 +254,20 @@
       {
          trace("AppendDelete txID=" + txID + " id=" + id);
       }
-      replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
-      if (localJournal != null)
+      preAppend();
+      try
       {
-         localJournal.appendDeleteRecordTransactional(txID, id, record);
+         replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
+         if (localJournal != null)
+         {
+            localJournal.appendDeleteRecordTransactional(txID, id, record);
+         }
       }
+      finally
+      {
+         afterAppend();
+      }
+
    }
 
    /**
@@ -230,11 +282,20 @@
       {
          trace("AppendDelete (noencoding) txID=" + txID + " id=" + id);
       }
-      replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
-      if (localJournal != null)
+      preAppend();
+      try
       {
-         localJournal.appendDeleteRecordTransactional(txID, id);
+         replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
+         if (localJournal != null)
+         {
+            localJournal.appendDeleteRecordTransactional(txID, id);
+         }
       }
+      finally
+      {
+         afterAppend();
+      }
+
    }
 
    /**
@@ -262,11 +323,20 @@
       {
          trace("AppendPrepare txID=" + txID);
       }
-      replicationManager.appendPrepareRecord(journalID, txID, transactionData);
-      if (localJournal != null)
+      preAppend();
+      try
       {
-         localJournal.appendPrepareRecord(txID, transactionData, sync);
+         replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+         if (localJournal != null)
+         {
+            localJournal.appendPrepareRecord(txID, transactionData, sync);
+         }
       }
+      finally
+      {
+         afterAppend();
+      }
+
    }
 
    /**
@@ -281,11 +351,20 @@
       {
          trace("AppendRollback " + txID);
       }
-      replicationManager.appendRollbackRecord(journalID, txID);
-      if (localJournal != null)
+      preAppend();
+      try
       {
-         localJournal.appendRollbackRecord(txID, sync);
+         replicationManager.appendRollbackRecord(journalID, txID);
+         if (localJournal != null)
+         {
+            localJournal.appendRollbackRecord(txID, sync);
+         }
       }
+      finally
+      {
+         afterAppend();
+      }
+
    }
 
    /**
@@ -315,11 +394,20 @@
       {
          trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
       }
-      replicationManager.appendUpdateRecord(journalID, id, recordType, record);
-      if (localJournal != null)
+      preAppend();
+      try
       {
-         localJournal.appendUpdateRecord(id, recordType, record, sync);
+         replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+         if (localJournal != null)
+         {
+            localJournal.appendUpdateRecord(id, recordType, record, sync);
+         }
       }
+      finally
+      {
+         afterAppend();
+      }
+
    }
 
    /**
@@ -355,11 +443,20 @@
       {
          trace("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
       }
-      replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
-      if (localJournal != null)
+      preAppend();
+      try
       {
-         localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+         replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
+         if (localJournal != null)
+         {
+            localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+         }
       }
+      finally
+      {
+         afterAppend();
+      }
+
    }
 
    /**
@@ -502,6 +599,22 @@
 
    // Private -------------------------------------------------------
 
+   private void preAppend()
+   {
+      if (journalLock != null)
+      {
+         journalLock.readLock();
+      }
+   }
+
+   private void afterAppend()
+   {
+      if (journalLock != null)
+      {
+         journalLock.readUnlock();
+      }
+   }
+
    // Inner classes -------------------------------------------------
 
 }

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-27 23:48:09 UTC (rev 8153)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-28 01:01:50 UTC (rev 8154)
@@ -41,6 +41,7 @@
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.JournalLock;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
@@ -169,7 +170,7 @@
          ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
          manager.start();
 
-         Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+         Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), new FakeJournal(), manager);
 
          replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
 
@@ -272,11 +273,11 @@
       config.setBackup(true);
 
       ArrayList<String> intercepts = new ArrayList<String>();
-      
+
       intercepts.add(TestInterceptor.class.getName());
-      
+
       config.setInterceptorClassNames(intercepts);
-      
+
       HornetQServer server = new HornetQServerImpl(config);
 
       server.start();
@@ -288,7 +289,7 @@
          ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
          manager.start();
 
-         Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+         Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), new FakeJournal(), manager);
 
          Thread.sleep(100);
          TestInterceptor.value.set(false);
@@ -308,7 +309,7 @@
          });
 
          manager.closeContext();
-         
+
          server.stop();
 
          assertTrue(latch.await(50, TimeUnit.SECONDS));
@@ -357,7 +358,7 @@
          ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
          manager.start();
 
-         Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+         Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), new FakeJournal(), manager);
 
          replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
 
@@ -505,8 +506,7 @@
 
    };
 
-
-   static class FakeJournal implements Journal
+   static class FakeJournal implements Journal, JournalLock
    {
 
       /* (non-Javadoc)
@@ -720,5 +720,33 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.impl.JournalLock#readLock()
+       */
+      public void readLock()
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.impl.JournalLock#readUnlock()
+       */
+      public void readUnlock()
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.impl.JournalLock#writeLock()
+       */
+      public void writeLock()
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.impl.JournalLock#writeUnLock()
+       */
+      public void writeUnLock()
+      {
+      }
+
    }
 }



More information about the hornetq-commits mailing list