Author: clebert.suconic(a)jboss.com
Date: 2009-10-27 21:01:50 -0400 (Tue, 27 Oct 2009)
New Revision: 8154
Added:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalLock.java
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Backup changes
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27
23:48:09 UTC (rev 8153)
+++
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-28
01:01:50 UTC (rev 8154)
@@ -69,7 +69,7 @@
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
*/
-public class JournalImpl implements TestableJournal
+public class JournalImpl implements TestableJournal, JournalLock
{
// Constants -----------------------------------------------------
@@ -2736,7 +2736,44 @@
lockAppend.unlock();
}
}
+
+ // JournalLock Interface ------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#readLock()
+ */
+ public void readLock()
+ {
+ globalLock.readLock().lock();
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#readUnlock()
+ */
+ public void readUnlock()
+ {
+ globalLock.readLock().unlock();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#writeLock()
+ */
+ public void writeLock()
+ {
+ globalLock.writeLock().lock();
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#writeUnLock()
+ */
+ public void writeUnLock()
+ {
+ globalLock.writeLock().unlock();
+ }
+
+
+
// Public
// -----------------------------------------------------------------------------
Added: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalLock.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalLock.java
(rev 0)
+++
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalLock.java 2009-10-28
01:01:50 UTC (rev 8154)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+/**
+ * This interface is used to share the lock operations done at the Journal level.
+ *
+ * Some Journal delegates may need to make sure the global locks are shared with the
caller.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface JournalLock
+{
+ void readLock();
+
+ void readUnlock();
+
+ void writeLock();
+
+ void writeUnLock();
+}
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-27
23:48:09 UTC (rev 8153)
+++
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-28
01:01:50 UTC (rev 8154)
@@ -195,7 +195,7 @@
if (replicator != null)
{
- this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings,
replicator);
+ this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings,
localBindings, replicator);
}
else
{
@@ -263,7 +263,7 @@
if (replicator != null)
{
- this.messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
+ this.messageJournal = new ReplicatedJournal((byte)1, localMessage, localMessage,
replicator);
}
else
{
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
---
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-27
23:48:09 UTC (rev 8153)
+++
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-28
01:01:50 UTC (rev 8154)
@@ -21,6 +21,7 @@
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.JournalLock;
import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
@@ -49,14 +50,20 @@
private final Journal localJournal;
+ private final JournalLock journalLock;
+
private final byte journalID;
- public ReplicatedJournal(final byte journaID, final Journal localJournal, final
ReplicationManager replicationManager)
+ public ReplicatedJournal(final byte journaID,
+ final JournalLock journalLock,
+ final Journal localJournal,
+ final ReplicationManager replicationManager)
{
super();
journalID = journaID;
this.localJournal = localJournal;
this.replicationManager = replicationManager;
+ this.journalLock = journalLock;
}
public ReplicatedJournal(final byte journaID, final ReplicationManager
replicationManager)
@@ -64,6 +71,7 @@
super();
journalID = journaID;
localJournal = null;
+ journalLock = null;
this.replicationManager = replicationManager;
}
@@ -104,11 +112,19 @@
{
trace("Append record id = " + id + " recordType = " +
recordType);
}
- replicationManager.appendAddRecord(journalID, id, recordType, record);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendAddRecord(id, recordType, record, sync);
+ replicationManager.appendAddRecord(journalID, id, recordType, record);
+ if (localJournal != null)
+ {
+ localJournal.appendAddRecord(id, recordType, record, sync);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
}
/**
@@ -141,11 +157,20 @@
{
trace("Append record TXid = " + id + " recordType = " +
recordType);
}
- replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType,
record);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType,
record);
+ if (localJournal != null)
+ {
+ localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -160,11 +185,20 @@
{
trace("AppendCommit " + txID);
}
- replicationManager.appendCommitRecord(journalID, txID);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendCommitRecord(txID, sync);
+ replicationManager.appendCommitRecord(journalID, txID);
+ if (localJournal != null)
+ {
+ localJournal.appendCommitRecord(txID, sync);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -179,11 +213,20 @@
{
trace("AppendDelete " + id);
}
- replicationManager.appendDeleteRecord(journalID, id);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendDeleteRecord(id, sync);
+ replicationManager.appendDeleteRecord(journalID, id);
+ if (localJournal != null)
+ {
+ localJournal.appendDeleteRecord(id, sync);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -211,11 +254,20 @@
{
trace("AppendDelete txID=" + txID + " id=" + id);
}
- replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendDeleteRecordTransactional(txID, id, record);
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id,
record);
+ if (localJournal != null)
+ {
+ localJournal.appendDeleteRecordTransactional(txID, id, record);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -230,11 +282,20 @@
{
trace("AppendDelete (noencoding) txID=" + txID + " id=" +
id);
}
- replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendDeleteRecordTransactional(txID, id);
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
+ if (localJournal != null)
+ {
+ localJournal.appendDeleteRecordTransactional(txID, id);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -262,11 +323,20 @@
{
trace("AppendPrepare txID=" + txID);
}
- replicationManager.appendPrepareRecord(journalID, txID, transactionData);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendPrepareRecord(txID, transactionData, sync);
+ replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+ if (localJournal != null)
+ {
+ localJournal.appendPrepareRecord(txID, transactionData, sync);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -281,11 +351,20 @@
{
trace("AppendRollback " + txID);
}
- replicationManager.appendRollbackRecord(journalID, txID);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendRollbackRecord(txID, sync);
+ replicationManager.appendRollbackRecord(journalID, txID);
+ if (localJournal != null)
+ {
+ localJournal.appendRollbackRecord(txID, sync);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -315,11 +394,20 @@
{
trace("AppendUpdateRecord id = " + id + " , recordType = " +
recordType);
}
- replicationManager.appendUpdateRecord(journalID, id, recordType, record);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendUpdateRecord(id, recordType, record, sync);
+ replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+ if (localJournal != null)
+ {
+ localJournal.appendUpdateRecord(id, recordType, record, sync);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -355,11 +443,20 @@
{
trace("AppendUpdateRecord txid=" + txID + " id = " + id +
" , recordType = " + recordType);
}
- replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType,
record);
- if (localJournal != null)
+ preAppend();
+ try
{
- localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ replicationManager.appendUpdateRecordTransactional(journalID, txID, id,
recordType, record);
+ if (localJournal != null)
+ {
+ localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ }
}
+ finally
+ {
+ afterAppend();
+ }
+
}
/**
@@ -502,6 +599,22 @@
// Private -------------------------------------------------------
+ private void preAppend()
+ {
+ if (journalLock != null)
+ {
+ journalLock.readLock();
+ }
+ }
+
+ private void afterAppend()
+ {
+ if (journalLock != null)
+ {
+ journalLock.readUnlock();
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified:
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-27
23:48:09 UTC (rev 8153)
+++
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-28
01:01:50 UTC (rev 8154)
@@ -41,6 +41,7 @@
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.JournalLock;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
@@ -169,7 +170,7 @@
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
executor);
manager.start();
- Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
new FakeJournal(), manager);
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
@@ -272,11 +273,11 @@
config.setBackup(true);
ArrayList<String> intercepts = new ArrayList<String>();
-
+
intercepts.add(TestInterceptor.class.getName());
-
+
config.setInterceptorClassNames(intercepts);
-
+
HornetQServer server = new HornetQServerImpl(config);
server.start();
@@ -288,7 +289,7 @@
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
executor);
manager.start();
- Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
new FakeJournal(), manager);
Thread.sleep(100);
TestInterceptor.value.set(false);
@@ -308,7 +309,7 @@
});
manager.closeContext();
-
+
server.stop();
assertTrue(latch.await(50, TimeUnit.SECONDS));
@@ -357,7 +358,7 @@
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
executor);
manager.start();
- Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
new FakeJournal(), manager);
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
@@ -505,8 +506,7 @@
};
-
- static class FakeJournal implements Journal
+ static class FakeJournal implements Journal, JournalLock
{
/* (non-Javadoc)
@@ -720,5 +720,33 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#readLock()
+ */
+ public void readLock()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#readUnlock()
+ */
+ public void readUnlock()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#writeLock()
+ */
+ public void writeLock()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalLock#writeUnLock()
+ */
+ public void writeUnLock()
+ {
+ }
+
}
}