[hornetq-commits] JBoss hornetq SVN: r11085 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: replication/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Aug 1 09:50:45 EDT 2011


Author: borges
Date: 2011-08-01 09:50:45 -0400 (Mon, 01 Aug 2011)
New Revision: 11085

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 INCOMPLETE: Lock StorageManager before re-assigning journal fields.

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-01 12:50:41 UTC (rev 11084)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-01 13:50:45 UTC (rev 11085)
@@ -32,6 +32,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.transaction.xa.Xid;
 
@@ -155,6 +156,7 @@
    public static final byte PAGE_CURSOR_COUNTER_INC = 41;
 
    private final BatchingIDGenerator idGenerator;
+   private final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
 
    private ReplicationManager replicator;
 
@@ -361,31 +363,37 @@
          throw new HornetQException(HornetQException.INTERNAL_ERROR,
                                     "journals here are not JournalImpl. You can't set a replicator!");
       }
+      JournalFile[] messageFiles = null;
+      JournalFile[] bindingsFiles = null;
+
       // XXX HORNETQ-720 WRITE LOCK the StorageManager.
+      storageManagerLock.writeLock().lock();
+      try
+      {
+         final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
+         final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
 
-      final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
-      final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
 
-      JournalFile[] messageFiles;
-      JournalFile[] bindingsFiles;
-
-      localMessageJournal.writeLock();
-      localBindingsJournal.writeLock();
-      try
-      {
-         messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
-         bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+         localMessageJournal.writeLock();
+         localBindingsJournal.writeLock();
+         try
+         {
+            messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
+            bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+         }
+         finally
+         {
+            localMessageJournal.writeUnlock();
+            localBindingsJournal.writeUnlock();
+         }
+         bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
+         messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
       }
       finally
       {
-         localMessageJournal.writeUnlock();
-         localBindingsJournal.writeUnlock();
+         // XXX HORNETQ-720 UNLOCK StorageManager...
+         storageManagerLock.writeLock().unlock();
       }
-      bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
-      messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
-
-      // XXX HORNETQ-720 UNLOCK StorageManager...
-
       sendJournalFile(messageFiles, JournalContent.MESSAGES);
       sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
 
@@ -571,6 +579,7 @@
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
       }
 
+      readLock();
       // Note that we don't sync, the add reference that comes immediately after will sync if appropriate
 
       if (message.isLargeMessage())
@@ -589,29 +598,45 @@
                                         false,
                                         getContext(false));
       }
+      readUnLock();
    }
 
    public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
 
    {
+      readLock();
       messageJournal.appendUpdateRecord(messageID,
                                         JournalStorageManager.ADD_REF,
                                         new RefEncoding(queueID),
                                         last && syncNonTransactional,
                                         getContext(last && syncNonTransactional));
+      readUnLock();
    }
 
+   private void readLock()
+   {
+      storageManagerLock.readLock().lock();
+   }
+
+   private void readUnLock()
+   {
+      storageManagerLock.readLock().unlock();
+   }
+
    public void storeAcknowledge(final long queueID, final long messageID) throws Exception
    {
+      readLock();
       messageJournal.appendUpdateRecord(messageID,
                                         JournalStorageManager.ACKNOWLEDGE_REF,
                                         new RefEncoding(queueID),
                                         syncNonTransactional,
                                         getContext(syncNonTransactional));
+      readUnLock();
    }
 
    public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
    {
+      readLock();
       long ackID = idGenerator.generateID();
       position.setRecordID(ackID);
       messageJournal.appendAddRecord(ackID,
@@ -619,19 +644,23 @@
                                      new CursorAckRecordEncoding(queueID, position),
                                      syncNonTransactional,
                                      getContext(syncNonTransactional));
+      readUnLock();
    }
 
    public void deleteMessage(final long messageID) throws Exception
    {
+      readLock();
       // Messages are deleted on postACK, one after another.
       // If these deletes are synchronized, we would build up messages on the Executor
       // increasing chances of losing deletes.
       // The StorageManager should verify messages without references
       messageJournal.appendDeleteRecord(messageID, false, getContext(false));
+      readUnLock();
    }
 
    public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
    {
+      readLock();
       ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
                                                                                                             .getID());
 
@@ -640,10 +669,12 @@
                                         encoding,
                                         syncNonTransactional,
                                         getContext(syncNonTransactional));
+      readUnLock();
    }
 
    public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
    {
+      readLock();
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
       messageJournal.appendAddRecord(recordID,
@@ -651,17 +682,21 @@
                                      encoding,
                                      syncNonTransactional,
                                      getContext(syncNonTransactional));
+      readUnLock();
    }
 
    public void deleteDuplicateID(final long recordID) throws Exception
    {
+      readLock();
       messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
+      readUnLock();
    }
 
    // Transactional operations
 
    public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
    {
+      readLock();
       if (message.getMessageID() <= 0)
       {
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
@@ -681,51 +716,61 @@
                                                      JournalStorageManager.ADD_MESSAGE,
                                                      message);
       }
-
+      readUnLock();
    }
 
    public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
    {
+      readLock();
       pageTransaction.setRecordID(generateUniqueID());
 
       messageJournal.appendAddRecordTransactional(txID,
                                                   pageTransaction.getRecordID(),
                                                   JournalStorageManager.PAGE_TRANSACTION,
                                                   pageTransaction);
+      readUnLock();
    }
 
    public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
    {
+      readLock();
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      pageTransaction.getRecordID(),
                                                      JournalStorageManager.PAGE_TRANSACTION,
                                                      new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
                                                                               depages));
+      readUnLock();
    }
 
    public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception
    {
+      readLock();
       messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
                                         JournalStorageManager.PAGE_TRANSACTION,
                                         new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
                                         syncNonTransactional,
                                         getContext(syncNonTransactional));
+      readUnLock();
    }
 
    public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
+      readLock();
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      messageID,
                                                      JournalStorageManager.ADD_REF,
                                                      new RefEncoding(queueID));
+      readUnLock();
    }
 
    public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
+      readLock();
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      messageID,
                                                      JournalStorageManager.ACKNOWLEDGE_REF,
                                                      new RefEncoding(queueID));
+      readUnLock();
    }
 
    /* (non-Javadoc)

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-08-01 12:50:41 UTC (rev 11084)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-08-01 13:50:45 UTC (rev 11085)
@@ -378,6 +378,7 @@
             journal.finishRemoteBackupSync();
          }
          server.setRemoteBackupUpToDate();
+         log.info("Backup server " + server + " is synchronized with live-server.");
          return;
       }
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-08-01 12:50:41 UTC (rev 11084)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-08-01 13:50:45 UTC (rev 11085)
@@ -580,7 +580,7 @@
                     }
                     catch (Exception e)
                     {
-                        log.warn("unable to announce backup for replication", e);
+                     log.warn("Unable to announce backup for replication.", e);
                     }
                 }
             });



More information about the hornetq-commits mailing list