[hornetq-commits] JBoss hornetq SVN: r8045 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence/impl/journal and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Oct 5 15:18:07 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-05 15:18:07 -0400 (Mon, 05 Oct 2009)
New Revision: 8045
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Changes
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-05 19:18:07 UTC (rev 8045)
@@ -48,6 +48,8 @@
void afterReplicated(Runnable run);
+ void completeReplication();
+
UUID getPersistentID();
void setPersistentID(UUID id) throws Exception;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-05 19:18:07 UTC (rev 8045)
@@ -254,7 +254,7 @@
}
else
{
- this.messageJournal = localBindings;
+ this.messageJournal = localMessage;
}
@@ -265,6 +265,17 @@
perfBlastPages = config.getJournalPerfBlastPages();
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#completeReplication()
+ */
+ public void completeReplication()
+ {
+ if (replicator != null)
+ {
+ replicator.completeToken();
+ }
+ }
+
public boolean isReplicated()
{
return replicator != null;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-05 19:18:07 UTC (rev 8045)
@@ -261,4 +261,11 @@
return false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#completeReplication()
+ */
+ public void completeReplication()
+ {
+ }
+
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-10-05 19:18:07 UTC (rev 8045)
@@ -144,18 +144,38 @@
tx.commit();
- count++;
-
- if (count == batchSize)
+
+ Runnable action = new Runnable()
{
- // We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very
- // long time in the case there are many messages in the queue
- active = false;
-
- executor.execute(new Prompter());
-
- count = 0;
+ public void run()
+ {
+
+ count++;
+
+ if (count == batchSize)
+ {
+ // We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very
+ // long time in the case there are many messages in the queue
+ active = false;
+
+
+ executor.execute(new Prompter());
+
+ count = 0;
+ }
+
+ }
+ };
+
+ if (storageManager.isReplicated())
+ {
+ storageManager.afterReplicated(action);
+ storageManager.completeReplication();
}
+ else
+ {
+ action.run();
+ }
}
private class Prompter implements Runnable
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-05 19:18:07 UTC (rev 8045)
@@ -755,35 +755,6 @@
return nodeID;
}
- public void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception
- {
- Binding binding = postOffice.getBinding(queueName);
-
- if (binding == null)
- {
- throw new IllegalStateException("Cannot find queue " + queueName);
- }
-
- Queue queue = (Queue)binding.getBindable();
-
- MessageReference reference = queue.removeFirstReference(messageID);
-
- Transaction tx = new TransactionImpl(storageManager);
-
- boolean routed = postOffice.redistribute(reference.getMessage(), queue, tx);
-
- if (routed)
- {
- queue.acknowledge(tx, reference);
-
- tx.commit();
- }
- else
- {
- throw new IllegalStateException("Must be routed");
- }
- }
-
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-05 19:18:07 UTC (rev 8045)
@@ -420,9 +420,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleCreateQueue(final CreateQueueMessage packet)
@@ -495,9 +493,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
@@ -533,9 +529,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
@@ -590,9 +584,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleExecuteBindingQuery(final SessionBindingQueryMessage packet)
@@ -636,9 +628,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleAcknowledge(final SessionAcknowledgeMessage packet)
@@ -673,12 +663,7 @@
}
}
- channel.confirm(packet);
-
- if (response != null)
- {
- channel.send(response);
- }
+ sendResponse(packet, response, false, false);
}
public void handleExpired(final SessionExpiredMessage packet)
@@ -697,7 +682,8 @@
log.error("Failed to acknowledge", e);
}
- channel.confirm(packet);
+
+ sendResponse(packet, null, false, false);
}
public void handleCommit(final Packet packet)
@@ -728,9 +714,7 @@
tx = new TransactionImpl(storageManager);
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleRollback(final RollbackMessage packet)
@@ -757,9 +741,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleXACommit(final SessionXACommitMessage packet)
@@ -820,9 +802,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleXAEnd(final SessionXAEndMessage packet)
@@ -894,9 +874,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleXAForget(final SessionXAForgetMessage packet)
@@ -906,9 +884,7 @@
Packet response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleXAJoin(final SessionXAJoinMessage packet)
@@ -957,9 +933,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleXAResume(final SessionXAResumeMessage packet)
@@ -1019,9 +993,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleXARollback(final SessionXARollbackMessage packet)
@@ -1082,9 +1054,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleXAStart(final SessionXAStartMessage packet)
@@ -1133,9 +1103,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleXASuspend(final Packet packet)
@@ -1182,9 +1150,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleXAPrepare(final SessionXAPrepareMessage packet)
@@ -1242,43 +1208,35 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleGetInDoubtXids(final Packet packet)
{
Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleGetXATimeout(final Packet packet)
{
Packet response = new SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleSetXATimeout(final SessionXASetTimeoutMessage packet)
{
Packet response = new SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleStart(final Packet packet)
{
setStarted(true);
- channel.confirm(packet);
+ sendResponse(packet, null, false, false);
}
public void handleStop(final Packet packet)
@@ -1287,9 +1245,7 @@
setStarted(false);
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleClose(final Packet packet)
@@ -1316,14 +1272,8 @@
}
}
- channel.confirm(packet);
+ sendResponse(packet, response, true, true);
- // We flush the confirmations to make sure any send confirmations get handled on the client side
- channel.flushConfirmations();
-
- channel.send(response);
-
- channel.close();
}
public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
@@ -1359,9 +1309,7 @@
}
}
- channel.confirm(packet);
-
- channel.send(response);
+ sendResponse(packet, response, false, false);
}
public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
@@ -1383,7 +1331,8 @@
log.error("Failed to receive credits " + this.server.getConfiguration().isBackup(), e);
}
- channel.confirm(packet);
+
+ sendResponse(packet, null, false, false);
}
public void handleSendLargeMessage(final SessionSendLargeMessage packet)
@@ -1418,7 +1367,7 @@
log.error("Failed to send message", e);
}
- channel.confirm(packet);
+ sendResponse(packet, null, false, false);
}
public void handleSend(final SessionSendMessage packet)
@@ -1465,13 +1414,8 @@
}
}
}
-
- channel.confirm(packet);
-
- if (response != null)
- {
- channel.send(response);
- }
+
+ sendResponse(packet, response, false, false);
}
public void handleSendContinuations(final SessionSendContinuationMessage packet)
@@ -1520,12 +1464,7 @@
}
}
- channel.confirm(packet);
-
- if (response != null)
- {
- channel.send(response);
- }
+ sendResponse(packet, response, false, false);
}
public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
@@ -1638,6 +1577,50 @@
// Private
// ----------------------------------------------------------------------------
+ /**
+ * Respond to client after replication
+ * @param packet
+ * @param response
+ */
+ private void sendResponse(final Packet confirmPacket, final Packet response, final boolean flush, final boolean closeChannel)
+ {
+ Runnable action = new Runnable()
+ {
+ public void run()
+ {
+ if (confirmPacket != null)
+ {
+ channel.confirm(confirmPacket);
+ if (flush)
+ {
+ channel.flushConfirmations();
+ }
+ }
+
+ if (response != null)
+ {
+ channel.send(response);
+ }
+
+ if (closeChannel)
+ {
+ channel.close();
+ }
+ }
+ };
+
+ if (storageManager.isReplicated())
+ {
+ storageManager.afterReplicated(action);
+ storageManager.completeReplication();
+ }
+ else
+ {
+ action.run();
+ }
+ }
+
+
private void setStarted(final boolean s)
{
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-05 15:13:57 UTC (rev 8044)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-05 19:18:07 UTC (rev 8045)
@@ -201,7 +201,61 @@
server.stop();
}
}
+
+ public void testNoActions() throws Exception
+ {
+ Configuration config = createDefaultConfig(false);
+
+ config.setBackup(true);
+
+ HornetQServer server = new HornetQServerImpl(config);
+
+ server.start();
+
+ try
+ {
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
+ manager.start();
+
+ Journal replicatedJournal = new ReplicatedJournalImpl((byte)1, new FakeJournal(), manager);
+
+ replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ manager.afterReplicated(new Runnable()
+ {
+
+ public void run()
+ {
+ latch.countDown();
+ }
+
+ });
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ assertEquals(1, manager.getActiveTokens().size());
+
+ manager.completeToken();
+
+ for (int i = 0; i < 100; i++)
+ {
+ // This is asynchronous. Have to wait completion
+ if (manager.getActiveTokens().size() == 0)
+ {
+ break;
+ }
+ Thread.sleep(1);
+ }
+
+ assertEquals(0, manager.getActiveTokens().size());
+ manager.stop();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
class FakeData implements EncodingSupport
{
More information about the hornetq-commits
mailing list