Author: clebert.suconic(a)jboss.com
Date: 2009-11-17 16:40:56 -0500 (Tue, 17 Nov 2009)
New Revision: 8302
Added:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/
branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java
Removed:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
another iteration
Copied:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java
(from rev 8301,
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java
(rev 0)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion;
+
+
+/**
+ * This represents a set of operations done as part of replication.
+ * When the entire set is done a group of Runnables can be executed.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface CompletionContext
+{
+ /** To be called by the replication manager, when new replication is added to the
queue */
+ void linedUp();
+
+ boolean hasData();
+
+ /** To be called by the replication manager, when data is confirmed on the channel */
+ void replicated();
+
+ void afterCompletion(Runnable runnable);
+
+ /** To be called when there are no more operations pending */
+ void complete();
+
+ /** Flush all pending callbacks on the Context */
+ void flush();
+
+ /** Replication may need some extra controls to guarantee ordering
+ * when nothing is persisted through the contexts
+ * @return The context is empty
+ */
+ boolean isEmpty();
+
+ void setEmpty(boolean empty);
+
+}
Copied:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java
(from rev 8301,
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java)
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java
(rev 0)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.core.completion.CompletionContext;
+
+/**
+ * A ReplicationToken
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ * TODO: Maybe I should move this to persistence.journal. I need to check a few
dependencies first.
+ *
+ */
+public class CompletionContextImpl implements CompletionContext
+{
+ private static final ThreadLocal<CompletionContext> tlReplicationContext = new
ThreadLocal<CompletionContext>();
+
+ public static CompletionContext getContext()
+ {
+ CompletionContext token = tlReplicationContext.get();
+ if (token == null)
+ {
+ token = new CompletionContextImpl();
+ tlReplicationContext.set(token);
+ }
+ return token;
+ }
+
+ private List<Runnable> tasks;
+
+ private int linedup = 0;
+
+ private int replicated = 0;
+
+ private boolean empty = false;
+
+ private volatile boolean complete = false;
+
+ /**
+ * @param executor
+ */
+ public CompletionContextImpl()
+ {
+ super();
+ }
+
+ /** To be called by the replication manager, when new replication is added to the
queue */
+ public void linedUp()
+ {
+ linedup++;
+ }
+
+ public boolean hasData()
+ {
+ return linedup > 0;
+ }
+
+ /** You may have several actions to be done after a replication operation is
completed. */
+ public void afterCompletion(Runnable runnable)
+ {
+ if (complete)
+ {
+ // Sanity check, this shouldn't happen
+ throw new IllegalStateException("The Replication Context is complete, and
no more tasks are accepted");
+ }
+
+ if (tasks == null)
+ {
+ // No need to use Concurrent, we only add from a single thread.
+ // We don't add any more Runnables after it is complete
+ tasks = new ArrayList<Runnable>();
+ }
+
+ tasks.add(runnable);
+ }
+
+ /** To be called by the replication manager, when data is confirmed on the channel */
+ public synchronized void replicated()
+ {
+ if (++replicated == linedup && complete)
+ {
+ flush();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationToken#complete()
+ */
+ public synchronized void complete()
+ {
+ tlReplicationContext.set(null);
+ complete = true;
+ if (replicated == linedup && complete)
+ {
+ flush();
+ }
+ }
+
+ public synchronized void flush()
+ {
+ if (tasks != null)
+ {
+ for (Runnable run : tasks)
+ {
+ run.run();
+ }
+ tasks.clear();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
+ */
+ public boolean isEmpty()
+ {
+ return empty;
+ }
+
+ public void setEmpty(final boolean sync)
+ {
+ this.empty = sync;
+ }
+
+}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -56,7 +56,7 @@
boolean isReplicated();
- void afterReplicated(Runnable run);
+ void afterCompletion(Runnable run);
/** Block until the replication is done.
* @throws Exception */
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -33,6 +33,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.completion.impl.CompletionContextImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
@@ -310,7 +311,7 @@
public void waitOnReplication(final long timeout) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
- afterReplicated(new Runnable()
+ afterCompletion(new Runnable()
{
public void run()
{
@@ -363,13 +364,9 @@
// TODO: shouldn't those page methods be on the PageManager? ^^^^
- public void afterReplicated(Runnable run)
+ public void afterCompletion(Runnable run)
{
- if (replicator == null)
- {
- throw new IllegalStateException("StorageManager is not replicated");
- }
- replicator.afterReplicated(run);
+ CompletionContextImpl.getContext().afterCompletion(run);
}
public UUID getPersistentID()
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -284,7 +284,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
*/
- public void afterReplicated(Runnable run)
+ public void afterCompletion(Runnable run)
{
run.run();
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -926,7 +926,7 @@
{
if (storageManager.isReplicated())
{
- storageManager.afterReplicated(new Runnable()
+ storageManager.afterCompletion(new Runnable()
{
public void run()
{
Deleted:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -1,47 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication;
-
-
-/**
- * This represents a set of operations done as part of replication.
- * When the entire set is done a group of Runnables can be executed.
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface ReplicationContext
-{
- /** To be called by the replication manager, when new replication is added to the
queue */
- void linedUp();
-
- boolean hasReplication();
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- void replicated();
-
- void addReplicationAction(Runnable runnable);
-
- /** To be called when there are no more operations pending */
- void complete();
-
- /** Flush all pending callbacks on the Context */
- void flush();
-
- boolean isSync();
-
- void setSync(boolean sync);
-
-}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -15,6 +15,7 @@
import java.util.Set;
+import org.hornetq.core.completion.CompletionContext;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -49,13 +50,10 @@
void appendRollbackRecord(byte journalID, long txID) throws Exception;
- /** Add an action to be executed after the pending replications */
- void afterReplicated(Runnable runnable);
-
void closeContext();
/** A list of tokens that are still waiting for replications to be completed */
- Set<ReplicationContext> getActiveTokens();
+ Set<CompletionContext> getActiveTokens();
/**
* @param storeName
Deleted:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -1,126 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.replication.ReplicationContext;
-
-/**
- * A ReplicationToken
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicationContextImpl implements ReplicationContext
-{
- private List<Runnable> tasks;
-
- private int linedup = 0;
-
- private int replicated = 0;
-
- private boolean sync = false;
-
- private volatile boolean complete = false;
-
- /**
- * @param executor
- */
- public ReplicationContextImpl()
- {
- super();
- }
-
- /** To be called by the replication manager, when new replication is added to the
queue */
- public void linedUp()
- {
- linedup++;
- }
-
- public boolean hasReplication()
- {
- return linedup > 0;
- }
-
- /** You may have several actions to be done after a replication operation is
completed. */
- public void addReplicationAction(Runnable runnable)
- {
- if (complete)
- {
- // Sanity check, this shouldn't happen
- throw new IllegalStateException("The Replication Context is complete, and
no more tasks are accepted");
- }
-
- if (tasks == null)
- {
- // No need to use Concurrent, we only add from a single thread.
- // We don't add any more Runnables after it is complete
- tasks = new ArrayList<Runnable>();
- }
-
- tasks.add(runnable);
- }
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- public synchronized void replicated()
- {
- // roundtrip packets won't have lined up packets
- if (++replicated == linedup && complete)
- {
- flush();
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationToken#complete()
- */
- public synchronized void complete()
- {
- complete = true;
- if (replicated == linedup && complete)
- {
- flush();
- }
- }
-
- public synchronized void flush()
- {
- if (tasks != null)
- {
- for (Runnable run : tasks)
- {
- run.run();
- }
- tasks.clear();
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
- */
- public boolean isSync()
- {
- return sync;
- }
-
- public void setSync(final boolean sync)
- {
- this.sync = sync;
- }
-
-}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -13,14 +13,17 @@
package org.hornetq.core.replication.impl;
-import java.util.ArrayList;
import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.hornetq.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.FailoverManager;
+import org.hornetq.core.completion.CompletionContext;
+import org.hornetq.core.completion.impl.CompletionContextImpl;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -45,9 +48,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.replication.ReplicationContext;
import org.hornetq.core.replication.ReplicationManager;
-import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.SimpleString;
/**
@@ -81,10 +82,8 @@
private final Object replicationLock = new Object();
- private final ThreadLocal<ReplicationContext> tlReplicationContext = new
ThreadLocal<ReplicationContext>();
+ private final Queue<CompletionContext> pendingTokens = new
ConcurrentLinkedQueue<CompletionContext>();
- private final Queue<ReplicationContext> pendingTokens = new
ConcurrentLinkedQueue<ReplicationContext>();
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -360,17 +359,17 @@
{
enabled = false;
- LinkedHashSet<ReplicationContext> activeContexts = new
LinkedHashSet<ReplicationContext>();
+ LinkedHashSet<CompletionContext> activeContexts = new
LinkedHashSet<CompletionContext>();
// The same context will be replicated on the pending tokens...
// as the multiple operations will be replicated on the same context
while (!pendingTokens.isEmpty())
{
- ReplicationContext ctx = pendingTokens.poll();
+ CompletionContext ctx = pendingTokens.poll();
activeContexts.add(ctx);
}
- for (ReplicationContext ctx : activeContexts)
+ for (CompletionContext ctx : activeContexts)
{
ctx.complete();
ctx.flush();
@@ -393,38 +392,17 @@
started = false;
}
- public ReplicationContext getContext()
- {
- ReplicationContext token = tlReplicationContext.get();
- if (token == null)
- {
- token = new ReplicationContextImpl();
- tlReplicationContext.set(token);
- }
- return token;
- }
-
/* (non-Javadoc)
- * @see
org.hornetq.core.replication.ReplicationManager#addReplicationAction(java.lang.Runnable)
- */
- public void afterReplicated(final Runnable runnable)
- {
- getContext().addReplicationAction(runnable);
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#completeToken()
*/
public void closeContext()
{
- final ReplicationContext token = tlReplicationContext.get();
+ final CompletionContext token = getContext();
if (token != null)
{
- // Disassociate thread local
- tlReplicationContext.set(null);
// Remove from pending tokens as soon as this is complete
- if (!token.hasReplication())
+ if (!token.hasData())
{
sync(token);
}
@@ -432,18 +410,19 @@
}
}
+
/* method for testcases only
* @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
*/
- public Set<ReplicationContext> getActiveTokens()
+ public Set<CompletionContext> getActiveTokens()
{
- LinkedHashSet<ReplicationContext> activeContexts = new
LinkedHashSet<ReplicationContext>();
+ LinkedHashSet<CompletionContext> activeContexts = new
LinkedHashSet<CompletionContext>();
// The same context will be replicated on the pending tokens...
// as the multiple operations will be replicated on the same context
- for (ReplicationContext ctx : pendingTokens)
+ for (CompletionContext ctx : pendingTokens)
{
activeContexts.add(ctx);
}
@@ -456,7 +435,7 @@
{
boolean runItNow = false;
- ReplicationContext repliToken = getContext();
+ CompletionContext repliToken = getContext();
repliToken.linedUp();
synchronized (replicationLock)
@@ -493,9 +472,9 @@
private void replicated()
{
- ArrayList<ReplicationContext> tokensToExecute = getTokens();
+ List<CompletionContext> tokensToExecute = getTokens();
- for (ReplicationContext ctx : tokensToExecute)
+ for (CompletionContext ctx : tokensToExecute)
{
ctx.replicated();
}
@@ -507,13 +486,13 @@
// Private -------------------------------------------------------
- private void sync(ReplicationContext context)
+ private void sync(CompletionContext context)
{
boolean executeNow = false;
synchronized (replicationLock)
{
context.linedUp();
- context.setSync(true);
+ context.setEmpty(true);
if (pendingTokens.isEmpty())
{
// this means the list is empty and we should process it now
@@ -532,6 +511,11 @@
}
}
+
+ public CompletionContext getContext()
+ {
+ return CompletionContextImpl.getContext();
+ }
/**
* This method will first get all the sync tokens (that won't go to the backup
node)
@@ -539,11 +523,11 @@
* At last, if the list is empty, it will verify if there are any future tokens that
are sync tokens, to avoid a case where no more replication is done due to inactivity.
* @return
*/
- private ArrayList<ReplicationContext> getTokens()
+ private List<CompletionContext> getTokens()
{
- ArrayList<ReplicationContext> retList = new
ArrayList<ReplicationContext>(1);
+ List<CompletionContext> retList = new LinkedList<CompletionContext>();
- ReplicationContext tokenPolled = null;
+ CompletionContext tokenPolled = null;
// First will get all the non replicated tokens up to the first one that is not
replicated
do
@@ -558,16 +542,16 @@
retList.add(tokenPolled);
}
- while (tokenPolled.isSync());
+ while (tokenPolled.isEmpty());
// This is to avoid a situation where we won't have more replicated packets
- // all the packets will need to be processed
+ // We need to make sure we process any pending sync packet up to the next non empty
packet
synchronized (replicationLock)
{
- while (!pendingTokens.isEmpty() && pendingTokens.peek().isSync())
+ while (!pendingTokens.isEmpty() && pendingTokens.peek().isEmpty())
{
tokenPolled = pendingTokens.poll();
- if (!tokenPolled.isSync())
+ if (!tokenPolled.isEmpty())
{
throw new IllegalStateException("Replicatoin context is not a
roundtrip token as expected");
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -146,7 +146,7 @@
if (storageManager.isReplicated())
{
- storageManager.afterReplicated(new Runnable()
+ storageManager.afterCompletion(new Runnable()
{
public void run()
{
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -1720,7 +1720,7 @@
{
if (storageManager.isReplicated())
{
- storageManager.afterReplicated(new Runnable()
+ storageManager.afterCompletion(new Runnable()
{
public void run()
{
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -244,7 +244,7 @@
{
if (storageManager.isReplicated())
{
- storageManager.afterReplicated(execAfterCommit);
+ storageManager.afterCompletion(execAfterCommit);
}
else
{
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -32,6 +32,7 @@
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.FailoverManager;
import org.hornetq.core.client.impl.FailoverManagerImpl;
+import org.hornetq.core.completion.impl.CompletionContextImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -272,20 +273,8 @@
replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
replicatedJournal.appendRollbackRecord(3, false);
- assertEquals(1, manager.getActiveTokens().size());
-
blockOnReplication(manager);
- for (int i = 0; i < 100; i++)
- {
- // This is asynchronous. Have to wait completion
- if (manager.getActiveTokens().size() == 0)
- {
- break;
- }
- Thread.sleep(1);
- }
-
assertEquals(0, manager.getActiveTokens().size());
ServerMessage msg = new ServerMessageImpl();
@@ -386,7 +375,7 @@
}
final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ CompletionContextImpl.getContext().afterCompletion(new Runnable()
{
public void run()
{
@@ -413,7 +402,7 @@
private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ CompletionContextImpl.getContext().afterCompletion(new Runnable()
{
public void run()
@@ -469,7 +458,7 @@
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ CompletionContextImpl.getContext().afterCompletion(new Runnable()
{
public void run()
@@ -541,7 +530,7 @@
}
- manager.afterReplicated(new Runnable()
+ CompletionContextImpl.getContext().afterCompletion(new Runnable()
{
public void run()
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-17
20:01:28 UTC (rev 8301)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-17
21:40:56 UTC (rev 8302)
@@ -1155,7 +1155,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
*/
- public void afterReplicated(Runnable run)
+ public void afterCompletion(Runnable run)
{
}