[hornetq-commits] JBoss hornetq SVN: r8038 - in branches/Replication_Clebert: src/main/org/hornetq/core/replication and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Oct 2 13:15:12 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-02 13:15:12 -0400 (Fri, 02 Oct 2009)
New Revision: 8038
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Changes
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java 2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPrepareMessage.java 2009-10-02 17:15:12 UTC (rev 8038)
@@ -46,7 +46,7 @@
public ReplicationPrepareMessage()
{
- super(REPLICATION_DELETE_TX);
+ super(REPLICATION_PREPARE);
}
public ReplicationPrepareMessage(byte journalID, long txId, EncodingSupport encodingData)
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-02 17:15:12 UTC (rev 8038)
@@ -13,6 +13,8 @@
package org.hornetq.core.replication;
+import java.util.Set;
+
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.server.HornetQComponent;
@@ -42,5 +44,13 @@
void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception;
void appendRollbackRecord(byte journalID, long txID) throws Exception;
+
+ /** Add an action to be executed after the pending replications */
+ void addReplicationAction(Runnable runnable);
+
+ void completeToken();
+
+ /** A list of tokens that are still waiting for replications to be completed */
+ Set<ReplicationToken> getActiveTokens();
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java 2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationToken.java 2009-10-02 17:15:12 UTC (rev 8038)
@@ -30,7 +30,7 @@
/** To be called by the replication manager, when data is confirmed on the channel */
void replicated();
- void addFutureCompletion(Runnable runnable);
+ void addReplicationAction(Runnable runnable);
/** To be called when there are no more operations pending */
void complete();
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-02 17:15:12 UTC (rev 8038)
@@ -122,7 +122,7 @@
Configuration config = server.getConfiguration();
// TODO: this needs an executor
- JournalStorageManager storage = new JournalStorageManager(config, null);
+ storage = new JournalStorageManager(config, null);
storage.start();
this.bindingsJournal = storage.getBindingsJournal();
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-02 17:15:12 UTC (rev 8038)
@@ -14,12 +14,12 @@
package org.hornetq.core.replication.impl;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.hornetq.core.client.impl.ConnectionManager;
import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
@@ -36,6 +36,7 @@
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.ReplicationToken;
+import org.hornetq.utils.ConcurrentHashSet;
/**
* A RepplicationManagerImpl
@@ -75,6 +76,8 @@
private final Queue<ReplicationToken> pendingTokens = new ConcurrentLinkedQueue<ReplicationToken>();
+ private final ConcurrentHashSet<ReplicationToken> activeTokens = new ConcurrentHashSet<ReplicationToken>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -121,7 +124,6 @@
sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id, recordType, record));
}
-
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte, long, long, byte, org.hornetq.core.journal.EncodingSupport)
*/
@@ -134,7 +136,6 @@
sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id, recordType, record));
}
-
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
*/
@@ -232,11 +233,50 @@
if (token == null)
{
token = new ReplicationTokenImpl(executor);
+ activeTokens.add(token);
repliToken.set(token);
}
return token;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#addReplicationAction(java.lang.Runnable)
+ */
+ public void addReplicationAction(Runnable runnable)
+ {
+ getReplicationToken().addReplicationAction(runnable);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#completeToken()
+ */
+ public void completeToken()
+ {
+ final ReplicationToken token = repliToken.get();
+ if (token != null)
+ {
+ // Disassociate thread local
+ repliToken.set(null);
+ // Remove from pending tokens as soon as this is complete
+ token.addReplicationAction(new Runnable()
+ {
+ public void run()
+ {
+ activeTokens.remove(token);
+ }
+ });
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
+ */
+ public Set<ReplicationToken> getActiveTokens()
+ {
+ return activeTokens;
+ }
+
+
private void sendReplicatePacket(final Packet packet)
{
boolean runItNow = false;
@@ -298,7 +338,6 @@
*/
public void handlePacket(Packet packet)
{
- System.out.println("HandlePacket on client");
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
{
replicated();
@@ -311,7 +350,7 @@
{
static NullEncoding instance = new NullEncoding();
-
+
public void decode(final HornetQBuffer buffer)
{
}
@@ -327,4 +366,5 @@
}
+
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-10-02 17:15:12 UTC (rev 8038)
@@ -66,7 +66,7 @@
}
/** You may have several actions to be done after a replication operation is completed. */
- public synchronized void addFutureCompletion(Runnable runnable)
+ public synchronized void addReplicationAction(Runnable runnable)
{
System.out.println("pendings on addFutureCompletion = " + pendings);
if (pendings == 0)
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-02 17:15:12 UTC (rev 8038)
@@ -354,6 +354,12 @@
{
storageManager.stop();
}
+
+ if (replicationEndpoint != null)
+ {
+ replicationEndpoint.stop();
+ replicationEndpoint = null;
+ }
if (securityManager != null)
{
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-02 15:59:19 UTC (rev 8037)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-02 17:15:12 UTC (rev 8038)
@@ -146,20 +146,22 @@
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
manager.start();
-
+
+ manager.appendPrepareRecord((byte)0, 100, new FakeData());
+
manager.appendAddRecord((byte)0, 1, (byte)1, new FakeData());
manager.appendUpdateRecord((byte)0, 1, (byte)2, new FakeData());
manager.appendDeleteRecord((byte)0, 1);
manager.appendAddRecordTransactional((byte)0, 2, 2, (byte)1, new FakeData());
manager.appendUpdateRecordTransactional((byte)0, 2, 2, (byte)2, new FakeData());
manager.appendCommitRecord((byte)0, 2);
-
- manager.appendDeleteRecordTransactional((byte)0, 3, 4,new FakeData());
+
+ manager.appendDeleteRecordTransactional((byte)0, 3, 4, new FakeData());
manager.appendPrepareRecord((byte)0, 3, new FakeData());
manager.appendRollbackRecord((byte)0, 3);
final CountDownLatch latch = new CountDownLatch(1);
- manager.getReplicationToken().addFutureCompletion(new Runnable()
+ manager.getReplicationToken().addReplicationAction(new Runnable()
{
public void run()
@@ -169,6 +171,21 @@
});
assertTrue(latch.await(1, TimeUnit.SECONDS));
+ assertEquals(1, manager.getActiveTokens().size());
+
+ manager.completeToken();
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ // This is asynchronous. Have to wait completion
+ if (manager.getActiveTokens().size() == 0)
+ {
+ break;
+ }
+ Thread.sleep(1);
+ }
+
+ assertEquals(0, manager.getActiveTokens().size());
manager.stop();
}
finally
@@ -265,6 +282,7 @@
protected void tearDown() throws Exception
{
+
executor.shutdown();
scheduledExecutor.shutdown();
@@ -274,6 +292,9 @@
connectionManager = null;
scheduledExecutor = null;
+
+ super.tearDown();
+
}
// Private -------------------------------------------------------
More information about the hornetq-commits
mailing list