[hornetq-commits] JBoss hornetq SVN: r11142 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/replication/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Aug 5 11:01:37 EDT 2011


Author: borges
Date: 2011-08-05 11:01:37 -0400 (Fri, 05 Aug 2011)
New Revision: 11142

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 _Initial_ support for replication during sync.

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-05 15:00:17 UTC (rev 11141)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-05 15:01:37 UTC (rev 11142)
@@ -218,6 +218,8 @@
 
    private final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString, PersistedAddressSetting>();
 
+   private final boolean hasCallbackSupport;
+
    public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory)
    {
       this(config, executorFactory, null);
@@ -304,8 +306,8 @@
       {
          throw new IllegalArgumentException("Unsupported journal type " + config.getJournalType());
       }
+      hasCallbackSupport = journalFF.isSupportsCallbacks();
 
-
       idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
 
       Journal localMessage = new JournalImpl(config.getJournalFileSize(),
@@ -3480,4 +3482,9 @@
       journal.stop();
    }
 
+   public boolean hasCallbackSupport()
+   {
+      return hasCallbackSupport;
+   }
+
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-08-05 15:00:17 UTC (rev 11141)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-08-05 15:01:37 UTC (rev 11142)
@@ -456,7 +456,8 @@
       JournalImpl journal = assertJournalImpl(journalIf);
       Map<Long, JournalFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
       JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(), mapToFill);
-      registerJournal(packet.getJournalContentType().typeByte, new ReplicatingJournal(current));
+      registerJournal(packet.getJournalContentType().typeByte,
+                      new ReplicatingJournal(current, storage.hasCallbackSupport()));
    }
 
    // XXX HORNETQ-720 really need to do away with this once the method calls get stable.

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java	2011-08-05 15:00:17 UTC (rev 11141)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java	2011-08-05 15:01:37 UTC (rev 11142)
@@ -1,7 +1,11 @@
 package org.hornetq.core.journal.impl;
 
 import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.IOCompletion;
 import org.hornetq.core.journal.Journal;
@@ -10,6 +14,9 @@
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
 
 /**
  * Journal used at a replicating backup server during the synchronization of data with the 'live'
@@ -19,69 +26,120 @@
  */
 public class ReplicatingJournal implements Journal
 {
+   private final ReentrantLock lockAppend = new ReentrantLock();
+   private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
 
    private final JournalFile file;
+   private final boolean hasCallbackSupport;
 
    /**
     * @param file
     */
-   public ReplicatingJournal(JournalFile file)
+   public ReplicatingJournal(JournalFile file, boolean hasCallbackSupport)
    {
       this.file = file;
+      this.hasCallbackSupport = hasCallbackSupport;
    }
 
    @Override
    public void start() throws Exception
    {
-      // TODO Auto-generated method stub
-
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void stop() throws Exception
    {
-      // TODO Auto-generated method stub
-
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public boolean isStarted()
    {
-      // TODO Auto-generated method stub
-      return false;
+      throw new UnsupportedOperationException();
    }
 
+   // ------------------------
    @Override
    public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
    {
-      throw new UnsupportedOperationException();
+      appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
 
    @Override
    public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback)
-                                                                                                                   throws Exception
+            throws Exception
    {
-      throw new UnsupportedOperationException();
+      appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, completionCallback);
    }
 
    @Override
    public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
    {
-      throw new UnsupportedOperationException();
+      SyncIOCompletion callback = getSyncCallback(sync);
+
+      appendAddRecord(id, recordType, record, sync, callback);
+
+      if (callback != null)
+      {
+         callback.waitCompletion();
+      }
    }
 
+   // ------------------------
+
+   private void readLockJournal()
+   {
+      journalLock.readLock().lock();
+   }
+
+   private void readUnlockJournal()
+   {
+      journalLock.readLock().unlock();
+   }
+
    @Override
    public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync,
-                               IOCompletion completionCallback) throws Exception
+            IOCompletion callback) throws Exception
    {
-      throw new UnsupportedOperationException();
+      JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
 
+         if (callback != null)
+         {
+            callback.storeLineUp();
+         }
+
+         lockAppend.lock();
+         try
+         {
+            JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
+         }
+         finally
+         {
+            lockAppend.unlock();
+         }
+
    }
 
+   /**
+    * @param addRecord
+    * @param b
+    * @param sync
+    * @param object
+    * @param callback
+    * @return
+    */
+   private JournalFile appendRecord(JournalInternalRecord addRecord, boolean b, boolean sync, Object object,
+            IOCompletion callback)
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
    @Override
    public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
@@ -89,146 +147,145 @@
             appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback)
                      throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync,
-                                  IOCompletion completionCallback) throws Exception
+            IOCompletion completionCallback) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendDeleteRecord(long id, boolean sync) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
             throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
             throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendDeleteRecordTransactional(long txID, long id) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendCommitRecord(long txID, boolean sync) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
-   public void
-            appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
+   public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext)
+            throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback)
             throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
-   public void
-            appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion callback)
-                                                                                                       throws Exception
+   public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion callback)
+            throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendRollbackRecord(long txID, boolean sync) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public JournalLoadInformation loadInternalOnly() throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
@@ -242,13 +299,13 @@
             List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure)
             throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
    public int getAlignment() throws Exception
    {
-      throw new UnsupportedOperationException();
+      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
    }
 
    @Override
@@ -274,4 +331,17 @@
    {
       throw new UnsupportedOperationException();
    }
+
+   private SyncIOCompletion getSyncCallback(final boolean sync)
+   {
+      if (hasCallbackSupport)
+      {
+         if (sync)
+         {
+            return new SimpleWaitIOCallback();
+         }
+         return DummyCallback.getInstance();
+      }
+      return null;
+   }
 }

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-08-05 15:00:17 UTC (rev 11141)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-08-05 15:01:37 UTC (rev 11142)
@@ -72,8 +72,9 @@
 
       // SEND more messages, now with the backup replicating
       sendMessages(session, producer, N_MSGS);
+      handler.deliver = true;
+      sendMessages(session, producer, 1);
 
-      handler.notifyAll();
       waitForBackup(sessionFactory, 10, true);
 
       Set<Long> liveIds = getFileIds(messageJournal);
@@ -199,7 +200,6 @@
       public BackupSyncDelay(ReplicationChannelHandler handler)
       {
          this.handler = handler;
-         // TODO Auto-generated constructor stub
       }
 
       @Override
@@ -228,6 +228,8 @@
    {
 
       private ChannelHandler handler;
+      private Packet onHold;
+      public volatile boolean deliver;
 
       public void addSubHandler(ChannelHandler handler)
       {
@@ -237,21 +239,17 @@
       @Override
       public void handlePacket(Packet packet)
       {
-         System.out.println(packet);
+         if (onHold != null && deliver)
+         {
+            handler.handlePacket(onHold);
+         }
          if (packet.getType() == PacketImpl.REPLICATION_SYNC)
          {
             ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
             if (syncMsg.isUpToDate())
             {
-               // Hold the message that notifies the backup that sync is done.
-               try
-               {
-                  wait();
-               }
-               catch (InterruptedException e)
-               {
-                  // no-op
-               }
+               onHold = packet;
+               return;
             }
          }
          handler.handlePacket(packet);



More information about the hornetq-commits mailing list