Author: clebert.suconic(a)jboss.com
Date: 2009-11-17 14:40:59 -0500 (Tue, 17 Nov 2009)
New Revision: 8300
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
My refactoring starting point
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-17
19:40:49 UTC (rev 8299)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-17
19:40:59 UTC (rev 8300)
@@ -147,6 +147,4 @@
void deleteGrouping(GroupBinding groupBinding) throws Exception;
-
- void sync();
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-17
19:40:49 UTC (rev 8299)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-17
19:40:59 UTC (rev 8300)
@@ -498,14 +498,6 @@
messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
}
- public void sync()
- {
- if (replicator != null)
- {
- replicator.sync();
- }
- }
-
// Transactional operations
public void storeMessageTransactional(final long txID, final ServerMessage message)
throws Exception
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-17
19:40:49 UTC (rev 8299)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-17
19:40:59 UTC (rev 8300)
@@ -914,13 +914,6 @@
}
}
}
- else
- {
- if (storageManager.isReplicated())
- {
- storageManager.sync();
- }
- }
message.incrementRefCount(reference);
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-17
19:40:49 UTC (rev 8299)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-17
19:40:59 UTC (rev 8300)
@@ -243,6 +243,8 @@
log.warn("Connection failure has been detected: " + me.getMessage() +
" [code=" + me.getCode() + "]");
+ System.out.println("Fail on RemotingConnectio");
+
// Then call the listeners
callFailureListeners(me);
@@ -399,6 +401,7 @@
for (final FailureListener listener : listenersClone)
{
+ System.out.println("Calling failure listener: " +
listener.getClass().getName());
try
{
listener.connectionFailed(me);
@@ -419,6 +422,8 @@
for (final CloseListener listener : listenersClone)
{
+ System.out.println("Calling listener -> " + listener);
+ System.out.println("Calling listener " +
listener.getClass().getName());
try
{
listener.connectionClosed();
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-11-17
19:40:49 UTC (rev 8299)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-11-17
19:40:59 UTC (rev 8300)
@@ -26,6 +26,8 @@
{
/** To be called by the replication manager, when new replication is added to the
queue */
void linedUp();
+
+ boolean hasReplication();
/** To be called by the replication manager, when data is confirmed on the channel */
void replicated();
@@ -37,5 +39,9 @@
/** Flush all pending callbacks on the Context */
void flush();
+
+ boolean isSync();
+
+ void setSync(boolean sync);
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-17
19:40:49 UTC (rev 8299)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-17
19:40:59 UTC (rev 8300)
@@ -88,6 +88,4 @@
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
- void sync();
-
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-17
19:40:49 UTC (rev 8299)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-17
19:40:59 UTC (rev 8300)
@@ -29,11 +29,15 @@
public class ReplicationContextImpl implements ReplicationContext
{
private List<Runnable> tasks;
-
- private AtomicInteger pendings = new AtomicInteger(0);
-
+
+ private int linedup = 0;
+
+ private int replicated = 0;
+
+ private boolean sync = false;
+
private volatile boolean complete = false;
-
+
/**
* @param executor
*/
@@ -45,9 +49,14 @@
/** To be called by the replication manager, when new replication is added to the
queue */
public void linedUp()
{
- pendings.incrementAndGet();
+ linedup++;
}
+ public boolean hasReplication()
+ {
+ return linedup > 0;
+ }
+
/** You may have several actions to be done after a replication operation is
completed. */
public void addReplicationAction(Runnable runnable)
{
@@ -63,32 +72,32 @@
// We don't add any more Runnables after it is complete
tasks = new ArrayList<Runnable>();
}
-
+
tasks.add(runnable);
}
/** To be called by the replication manager, when data is confirmed on the channel */
public synchronized void replicated()
{
- if (pendings.decrementAndGet() == 0 && complete)
+ // roundtrip packets won't have lined up packets
+ if (++replicated == linedup && complete)
{
flush();
}
}
-
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationToken#complete()
*/
public synchronized void complete()
{
complete = true;
- if (pendings.get() == 0 && complete)
+ if (replicated == linedup && complete)
{
flush();
}
- }
-
+ }
+
public synchronized void flush()
{
if (tasks != null)
@@ -100,6 +109,18 @@
tasks.clear();
}
}
-
-
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
+ */
+ public boolean isSync()
+ {
+ return sync;
+ }
+
+ public void setSync(final boolean sync)
+ {
+ this.sync = sync;
+ }
+
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-17
19:40:49 UTC (rev 8299)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-17
19:40:59 UTC (rev 8300)
@@ -13,6 +13,8 @@
package org.hornetq.core.replication.impl;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -34,7 +36,6 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -84,8 +85,6 @@
private final Queue<ReplicationContext> pendingTokens = new
ConcurrentLinkedQueue<ReplicationContext>();
- private final ConcurrentHashSet<ReplicationContext> activeContexts = new
ConcurrentHashSet<ReplicationContext>();
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -278,14 +277,6 @@
sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId));
}
}
-
- public void sync()
- {
- if (enabled)
- {
- sendReplicatePacket(new ReplicationSyncContextMessage());
- }
- }
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long,
byte[])
@@ -351,9 +342,9 @@
log.warn(e.getMessage(), e);
}
}
-
+
public void beforeReconnect(HornetQException me)
- {
+ {
}
});
@@ -369,14 +360,22 @@
{
enabled = false;
+ LinkedHashSet<ReplicationContext> activeContexts = new
LinkedHashSet<ReplicationContext>();
+
+ // The same context will be replicated on the pending tokens...
+ // as the multiple operations will be replicated on the same context
+ while (!pendingTokens.isEmpty())
+ {
+ ReplicationContext ctx = pendingTokens.poll();
+ activeContexts.add(ctx);
+ }
+
for (ReplicationContext ctx : activeContexts)
{
ctx.complete();
ctx.flush();
}
-
- activeContexts.clear();
-
+
if (replicatingChannel != null)
{
replicatingChannel.close();
@@ -400,7 +399,6 @@
if (token == null)
{
token = new ReplicationContextImpl();
- activeContexts.add(token);
tlReplicationContext.set(token);
}
return token;
@@ -420,29 +418,38 @@
public void closeContext()
{
final ReplicationContext token = tlReplicationContext.get();
-
+
if (token != null)
{
// Disassociate thread local
tlReplicationContext.set(null);
// Remove from pending tokens as soon as this is complete
- token.addReplicationAction(new Runnable()
+ if (!token.hasReplication())
{
- public void run()
- {
- activeContexts.remove(token);
- }
- });
+ sync(token);
+ }
token.complete();
}
}
- /* (non-Javadoc)
+ /* method for testcases only
* @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
*/
public Set<ReplicationContext> getActiveTokens()
{
+
+ LinkedHashSet<ReplicationContext> activeContexts = new
LinkedHashSet<ReplicationContext>();
+
+ // The same context will be replicated on the pending tokens...
+ // as the multiple operations will be replicated on the same context
+
+ for (ReplicationContext ctx : pendingTokens)
+ {
+ activeContexts.add(ctx);
+ }
+
return activeContexts;
+
}
private void sendReplicatePacket(final Packet packet)
@@ -475,7 +482,7 @@
repliToken.replicated();
}
}
-
+
/* (non-Javadoc)
* @see
org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
*/
@@ -484,18 +491,14 @@
replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
}
-
private void replicated()
{
- ReplicationContext tokenPolled = pendingTokens.poll();
- if (tokenPolled == null)
+ ArrayList<ReplicationContext> tokensToExecute = getTokens();
+
+ for (ReplicationContext ctx : tokensToExecute)
{
- throw new IllegalStateException("Missing replication token on the
queue.");
+ ctx.replicated();
}
- else
- {
- tokenPolled.replicated();
- }
}
// Package protected ---------------------------------------------
@@ -504,6 +507,80 @@
// Private -------------------------------------------------------
+ private void sync(ReplicationContext context)
+ {
+ boolean executeNow = false;
+ synchronized (replicationLock)
+ {
+ context.linedUp();
+ context.setSync(true);
+ if (pendingTokens.isEmpty())
+ {
+ // this means the list is empty and we should process it now
+ executeNow = true;
+ }
+ else
+ {
+ // adding the sync to be executed in order
+ // as soon as the reponses are back from the backup
+ this.pendingTokens.add(context);
+ }
+ }
+ if (executeNow)
+ {
+ context.replicated();
+ }
+ }
+
+
+ /**
+ * This method will first get all the sync tokens (that won't go to the backup
node)
+ * Then it will get the round trip tokens.
+ * At last, if the list is empty, it will verify if there are any future tokens that
are sync tokens, to avoid a case where no more replication is done due to inactivity.
+ * @return
+ */
+ private ArrayList<ReplicationContext> getTokens()
+ {
+ ArrayList<ReplicationContext> retList = new
ArrayList<ReplicationContext>(1);
+
+ ReplicationContext tokenPolled = null;
+
+ // First will get all the non replicated tokens up to the first one that is not
replicated
+ do
+ {
+ tokenPolled = pendingTokens.poll();
+
+ if (tokenPolled == null)
+ {
+ throw new IllegalStateException("Missing replication token on the
queue.");
+ }
+
+ retList.add(tokenPolled);
+
+ }
+ while (tokenPolled.isSync());
+
+ // This is to avoid a situation where we won't have more replicated packets
+ // all the packets will need to be processed
+ synchronized (replicationLock)
+ {
+ while (!pendingTokens.isEmpty() && pendingTokens.peek().isSync())
+ {
+ tokenPolled = pendingTokens.poll();
+ if (!tokenPolled.isSync())
+ {
+ throw new IllegalStateException("Replicatoin context is not a
roundtrip token as expected");
+ }
+
+ retList.add(tokenPolled);
+
+ }
+ }
+
+ return retList;
+ }
+
+
// Inner classes -------------------------------------------------
protected class ResponseHandler implements ChannelHandler
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-17
19:40:49 UTC (rev 8299)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-17
19:40:59 UTC (rev 8300)
@@ -539,10 +539,6 @@
{
replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
}
- else
- {
- manager.sync();
- }
manager.afterReplicated(new Runnable()