JBoss hornetq SVN: r8311 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-18 11:47:53 -0500 (Wed, 18 Nov 2009)
New Revision: 8311
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
HORNETQ-220: Swallowed IIOBE on the Queue's Handler
* reset the handlers' pos when a handler is removed
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-18 15:11:45 UTC (rev 8310)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-18 16:47:53 UTC (rev 8311)
@@ -895,6 +895,10 @@
{
iter.remove();
+ if (pos >= handlers.size())
+ {
+ pos = 0;
+ }
removed = true;
break;
15 years, 1 month
JBoss hornetq SVN: r8310 - in trunk/tests/src/org/hornetq/tests/integration: persistence and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-18 10:11:45 -0500 (Wed, 18 Nov 2009)
New Revision: 8310
Removed:
trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
Modified:
trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
Log:
just a clean up (duplicated tests)
Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java 2009-11-18 15:05:07 UTC (rev 8309)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java 2009-11-18 15:11:45 UTC (rev 8310)
@@ -13,15 +13,13 @@
package org.hornetq.tests.integration.largemessage;
-import org.hornetq.core.client.ClientConsumer;
-import org.hornetq.core.client.ClientMessage;
-import org.hornetq.core.client.ClientProducer;
-import org.hornetq.core.client.ClientSession;
-import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.persistence.impl.journal.FileLargeServerMessage;
+import java.util.concurrent.Executors;
+
+import org.hornetq.core.config.Configuration;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.ServerMessage;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -44,65 +42,34 @@
// Public --------------------------------------------------------
- // The ClientConsumer should be able to also send ServerLargeMessages as that's done by the CoreBridge
- public void testSendServerMessage() throws Exception
+ public void testLargeMessageCopy() throws Exception
{
- HornetQServer server = createServer(true);
-
- server.start();
-
- ClientSessionFactory sf = createFactory(false);
-
- ClientSession session = sf.createSession(false, false);
-
- try
+ clearData();
+
+ Configuration configuration = createDefaultConfig();
+
+ configuration.start();
+
+ configuration.setJournalType(JournalType.ASYNCIO);
+
+ final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
+ journal.start();
+
+ LargeServerMessage msg = journal.createLargeMessage();
+ msg.setMessageID(1);
+
+ byte[] data = new byte[1024];
+
+ for (int i = 0; i < 110; i++)
{
- FileLargeServerMessage fileMessage = new FileLargeServerMessage((JournalStorageManager)server.getStorageManager());
-
- fileMessage.setMessageID(1005);
-
- for (int i = 0 ; i < 2 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
- {
- fileMessage.addBytes(new byte[]{getSamplebyte(i)});
- }
-
- fileMessage.releaseResources();
-
- session.createQueue("A", "A");
-
- ClientProducer prod = session.createProducer("A");
-
- prod.send(fileMessage);
-
- fileMessage.deleteFile();
-
- session.commit();
-
- session.start();
-
- ClientConsumer cons = session.createConsumer("A");
-
- ClientMessage msg = cons.receive(5000);
-
- assertNotNull(msg);
-
- assertEquals(msg.getBodySize(), 2 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-
- for (int i = 0 ; i < 2 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
- {
- assertEquals(getSamplebyte(i), msg.getBody().readByte());
- }
-
- msg.acknowledge();
-
- session.commit();
-
+ msg.addBytes(data);
}
- finally
- {
- sf.close();
- server.stop();
- }
+
+ ServerMessage msg2 = msg.copy(2);
+
+ assertEquals(110 * 1024, msg.getBodySize());
+ assertEquals(110 * 1024, msg2.getBodySize());
+
}
// Package protected ---------------------------------------------
Deleted: trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-11-18 15:05:07 UTC (rev 8309)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-11-18 15:11:45 UTC (rev 8310)
@@ -1,85 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.persistence;
-
-import java.util.concurrent.Executors;
-
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.tests.util.ServiceTestBase;
-
-/**
- * A JournalStorageManagerIntegrationTest
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- * Created Jan 24, 2009 11:14:13 PM
- *
- *
- */
-public class JournalStorageManagerIntegrationTest extends ServiceTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testLargeMessageCopy() throws Exception
- {
- clearData();
-
- Configuration configuration = createDefaultConfig();
-
- configuration.start();
-
- configuration.setJournalType(JournalType.ASYNCIO);
-
- final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
- journal.start();
-
- LargeServerMessage msg = journal.createLargeMessage();
- msg.setMessageID(1);
-
- byte[] data = new byte[1024];
-
- for (int i = 0; i < 110; i++)
- {
- msg.addBytes(data);
- }
-
- ServerMessage msg2 = msg.copy(2);
-
- assertEquals(110 * 1024, msg.getBodySize());
- assertEquals(110 * 1024, msg2.getBodySize());
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
15 years, 1 month
JBoss hornetq SVN: r8309 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-18 10:05:07 -0500 (Wed, 18 Nov 2009)
New Revision: 8309
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
Log:
HORNETQ-220: Swallowed IIOBE on the Queue's Handler
* added test showing the swallowed IIOBE
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-11-18 10:28:42 UTC (rev 8308)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-11-18 15:05:07 UTC (rev 8309)
@@ -567,7 +567,31 @@
clientSession = sessionFactory.createSession(false, true, true);
clientSession.createQueue(qName, qName, null, false);
}
+
+ // do not swallow exception in DeliverRunner.run() to show the IOOBE on the queue handlers
+ public void testSwallowedIndexOutOfBoundsException() throws Exception
+ {
+ ClientConsumer consumer = clientSession.createConsumer(qName, null, false);
+ ClientConsumer consumer2 = clientSession.createConsumer(qName, null, false);
+ ClientProducer producer = clientSession.createProducer(qName);
+ ClientMessage message = createTextMessage("m0" , clientSession);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, new SimpleString("g1"));
+ producer.send(message);
+
+ clientSession.start();
+
+ ClientMessage msg = consumer.receive();
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertNull(consumer.receive(500));
+
+ consumer.close();
+ consumer2.close();
+ consumer = clientSession.createConsumer(qName, null, false);
+ assertNull(consumer.receive(500));
+ }
+
private static class DummyMessageHandler implements MessageHandler
{
ArrayList<ClientMessage> list = new ArrayList<ClientMessage>();
15 years, 1 month
JBoss hornetq SVN: r8308 - trunk/src/main/org/hornetq/core/replication/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-18 05:28:42 -0500 (Wed, 18 Nov 2009)
New Revision: 8308
Modified:
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
do not attempt reconnection on the replication connection when the ReplicationManager is stopped
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-18 09:54:38 UTC (rev 8307)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-18 10:28:42 UTC (rev 8308)
@@ -367,6 +367,11 @@
*/
public void stop() throws Exception
{
+ if (!started)
+ {
+ return;
+ }
+
enabled = false;
for (ReplicationContext ctx : activeContexts)
@@ -384,6 +389,8 @@
started = false;
+ failoverManager.causeExit();
+
if (connection != null)
{
connection.destroy();
15 years, 1 month
JBoss hornetq SVN: r8307 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-18 04:54:38 -0500 (Wed, 18 Nov 2009)
New Revision: 8307
Modified:
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
stop the ReplicationManager when stopping the server
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-18 04:38:26 UTC (rev 8306)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-18 09:54:38 UTC (rev 8307)
@@ -384,7 +384,13 @@
replicationEndpoint.stop();
replicationEndpoint = null;
}
-
+
+ if (replicationManager != null)
+ {
+ replicationManager.stop();
+ replicationManager = null;
+ }
+
if (securityManager != null)
{
securityManager.stop();
15 years, 1 month
JBoss hornetq SVN: r8306 - branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-17 23:38:26 -0500 (Tue, 17 Nov 2009)
New Revision: 8306
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
Log:
tweaks
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-18 04:31:45 UTC (rev 8305)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-18 04:38:26 UTC (rev 8306)
@@ -222,26 +222,29 @@
// to execute this runnable in the correct order
storageManager.afterCompleteOperations(new IOCompletion()
{
-
+
public void onError(int errorCode, String errorMessage)
{
log.warn("IO Error completing the transaction, code = " + errorCode + ", message = " + errorMessage);
}
-
+
public void done()
{
- for (TransactionOperation operation : operations)
+ if (operations != null)
{
- try
+ for (TransactionOperation operation : operations)
{
- operation.afterCommit(TransactionImpl.this);
+ try
+ {
+ operation.afterCommit(TransactionImpl.this);
+ }
+ catch (Exception e)
+ {
+ // https://jira.jboss.org/jira/browse/HORNETQ-188
+ // After commit shouldn't throw an exception
+ log.warn(e.getMessage(), e);
+ }
}
- catch (Exception e)
- {
- // https://jira.jboss.org/jira/browse/HORNETQ-188
- // After commit shouldn't throw an exception
- log.warn(e.getMessage(), e);
- }
}
}
});
15 years, 1 month
JBoss hornetq SVN: r8305 - branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-17 23:31:45 -0500 (Tue, 17 Nov 2009)
New Revision: 8305
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
Fixing context
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-18 04:19:10 UTC (rev 8304)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-18 04:31:45 UTC (rev 8305)
@@ -300,6 +300,10 @@
{
replicator.closeContext();
}
+ else
+ {
+ OperationContextImpl.getContext().complete();
+ }
}
public boolean isReplicated()
15 years, 1 month
JBoss hornetq SVN: r8304 - in branches/ClebertTemporary: src/main/org/hornetq/core/postoffice/impl and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-17 23:19:10 -0500 (Tue, 17 Nov 2009)
New Revision: 8304
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
more changes
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-18 02:52:31 UTC (rev 8303)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-18 04:19:10 UTC (rev 8304)
@@ -444,27 +444,27 @@
messageJournal.appendAddRecord(message.getMessageID(),
ADD_LARGE_MESSAGE,
new LargeMessageEncoding((LargeServerMessage)message),
- false);
+ false, getIOContext());
}
else
{
- messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message, false);
+ messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message, false, getIOContext());
}
}
public void storeReference(final long queueID, final long messageID) throws Exception
{
- messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID), syncNonTransactional);
+ messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID), syncNonTransactional, getIOContext());
}
public void storeAcknowledge(final long queueID, final long messageID) throws Exception
{
- messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional);
+ messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getIOContext());
}
public void deleteMessage(final long messageID) throws Exception
{
- messageJournal.appendDeleteRecord(messageID, syncNonTransactional);
+ messageJournal.appendDeleteRecord(messageID, syncNonTransactional, getIOContext());
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
@@ -475,19 +475,19 @@
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
encoding,
- syncNonTransactional);
+ syncNonTransactional, getIOContext());
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding, syncNonTransactional);
+ messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding, syncNonTransactional, getIOContext());
}
public void deleteDuplicateID(long recordID) throws Exception
{
- messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
+ messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getIOContext());
}
// Transactional operations
@@ -543,13 +543,13 @@
public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
{
long id = generateUniqueID();
- messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true);
+ messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true, getIOContext());
return id;
}
public void deleteHeuristicCompletion(long id) throws Exception
{
- messageJournal.appendDeleteRecord(id, true);
+ messageJournal.appendDeleteRecord(id, true, getIOContext());
}
public void deletePageTransactional(final long txID, final long recordID) throws Exception
@@ -575,17 +575,17 @@
public void prepare(final long txID, final Xid xid) throws Exception
{
- messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional);
+ messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getIOContext());
}
public void commit(final long txID) throws Exception
{
- messageJournal.appendCommitRecord(txID, syncTransactional);
+ messageJournal.appendCommitRecord(txID, syncTransactional, getIOContext());
}
public void rollback(final long txID) throws Exception
{
- messageJournal.appendRollbackRecord(txID, syncTransactional);
+ messageJournal.appendRollbackRecord(txID, syncTransactional, getIOContext());
}
public void storeDuplicateIDTransactional(final long txID,
@@ -623,7 +623,7 @@
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
UPDATE_DELIVERY_COUNT,
updateInfo,
- syncNonTransactional);
+ syncNonTransactional, getIOContext());
}
private static final class AddMessageRecord
@@ -1368,6 +1368,11 @@
}
// Private ----------------------------------------------------------------------------------
+
+ private IOCompletion getIOContext()
+ {
+ return OperationContextImpl.getContext();
+ }
private void checkAndCreateDir(final String dir, final boolean create)
{
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-18 02:52:31 UTC (rev 8303)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-18 04:19:10 UTC (rev 8304)
@@ -28,6 +28,7 @@
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
@@ -924,10 +925,15 @@
}
else
{
- storageManager.afterCompleteOperations(new Runnable()
+ storageManager.afterCompleteOperations(new IOCompletion()
{
- public void run()
+ public void onError(int errorCode, String errorMessage)
{
+ log.warn("It wasn't possible to add references due to an IO error code " + errorCode + " message = " + errorMessage);
+ }
+
+ public void done()
+ {
addReferences(refs);
}
});
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-18 02:52:31 UTC (rev 8303)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-18 04:19:10 UTC (rev 8304)
@@ -16,6 +16,7 @@
import java.util.List;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
@@ -27,7 +28,6 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.replication.ReplicationManager;
-
/**
* Used by the {@link JournalStorageManager} to replicate journal calls.
*
@@ -46,7 +46,7 @@
// Attributes ----------------------------------------------------
private static final boolean trace = false;
-
+
private static void trace(String message)
{
System.out.println("ReplicatedJournal::" + message);
@@ -58,9 +58,7 @@
private final byte journalID;
- public ReplicatedJournal(final byte journaID,
- final Journal localJournal,
- final ReplicationManager replicationManager)
+ public ReplicatedJournal(final byte journaID, final Journal localJournal, final ReplicationManager replicationManager)
{
super();
journalID = journaID;
@@ -69,11 +67,11 @@
}
// Static --------------------------------------------------------
-
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+
/**
* @param id
* @param recordType
@@ -87,6 +85,21 @@
this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+ {
+ if (trace)
+ {
+ trace("Append record id = " + id + " recordType = " + recordType);
+ }
+ replicationManager.appendAddRecord(journalID, id, recordType, record);
+ localJournal.appendAddRecord(id, recordType, record, sync);
+ }
+
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback) throws Exception
+ {
+ this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, completionCallback);
+ }
+
/**
* @param id
* @param recordType
@@ -95,14 +108,18 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
+ public void appendAddRecord(final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final boolean sync,
+ IOCompletion completionCallback) throws Exception
{
if (trace)
{
trace("Append record id = " + id + " recordType = " + recordType);
}
replicationManager.appendAddRecord(journalID, id, recordType, record);
- localJournal.appendAddRecord(id, recordType, record, sync);
+ localJournal.appendAddRecord(id, recordType, record, sync, completionCallback);
}
/**
@@ -155,6 +172,19 @@
localJournal.appendCommitRecord(txID, sync);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendCommit " + txID);
+ }
+ replicationManager.appendCommitRecord(journalID, txID);
+ localJournal.appendCommitRecord(txID, sync, callback);
+ }
+
/**
* @param id
* @param sync
@@ -171,6 +201,19 @@
localJournal.appendDeleteRecord(id, sync);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendDelete " + id);
+ }
+ replicationManager.appendDeleteRecord(journalID, id);
+ localJournal.appendDeleteRecord(id, sync, completionCallback);
+ }
+
/**
* @param txID
* @param id
@@ -245,6 +288,27 @@
localJournal.appendPrepareRecord(txID, transactionData, sync);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendPrepare txID=" + txID);
+ }
+ replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+ localJournal.appendPrepareRecord(txID, transactionData, sync, callback);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion callback) throws Exception
+ {
+ this.appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync, callback);
+ }
+
/**
* @param txID
* @param sync
@@ -261,6 +325,19 @@
localJournal.appendRollbackRecord(txID, sync);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendRollback " + txID);
+ }
+ replicationManager.appendRollbackRecord(journalID, txID);
+ localJournal.appendRollbackRecord(txID, sync, callback);
+ }
+
/**
* @param id
* @param recordType
@@ -291,7 +368,34 @@
replicationManager.appendUpdateRecord(journalID, id, recordType, record);
localJournal.appendUpdateRecord(id, recordType, record, sync);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[], boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback) throws Exception
+ {
+ this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync, completionCallback);
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendUpdateRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
+ }
+ replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+ localJournal.appendUpdateRecord(id, recordType, record, sync, completionCallback);
+ }
+
+
+
/**
* @param txID
* @param id
@@ -338,8 +442,8 @@
* @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List, org.hornetq.core.journal.TransactionFailureCallback)
*/
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
- final List<PreparedTransactionInfo> preparedTransactions,
- final TransactionFailureCallback transactionFailure) throws Exception
+ final List<PreparedTransactionInfo> preparedTransactions,
+ final TransactionFailureCallback transactionFailure) throws Exception
{
return localJournal.load(committedRecords, preparedTransactions, transactionFailure);
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-18 02:52:31 UTC (rev 8303)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-18 04:19:10 UTC (rev 8304)
@@ -16,6 +16,7 @@
import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
@@ -144,13 +145,20 @@
tx.commit();
- storageManager.afterCompleteOperations(new Runnable()
+ storageManager.afterCompleteOperations(new IOCompletion()
{
- public void run()
+
+ public void onError(int errorCode, String errorMessage)
{
+ log.warn("IO Error during redistribution, errorCode = " + errorCode + " message = " + errorMessage);
+ }
+
+ public void done()
+ {
execPrompter();
}
});
+
storageManager.completeOperations();
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-18 02:52:31 UTC (rev 8303)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-18 04:19:10 UTC (rev 8304)
@@ -1,5 +1,5 @@
/*
- * Copyright 2009 Red Hat, Inc.
+x * Copyright 2009 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -34,6 +34,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
@@ -1718,13 +1719,22 @@
final boolean flush,
final boolean closeChannel)
{
- storageManager.afterCompleteOperations(new Runnable()
+ storageManager.afterCompleteOperations(new IOCompletion()
{
- public void run()
+
+ public void onError(int errorCode, String errorMessage)
{
+ log.warn("Error processing IOCallback code = " + errorCode + " message = " + errorMessage);
+
+ HornetQExceptionMessage exceptionMessage = new HornetQExceptionMessage(new HornetQException(errorCode, errorMessage));
+
+ doSendResponse(confirmPacket, exceptionMessage, flush, closeChannel);
+ }
+
+ public void done()
+ {
doSendResponse(confirmPacket, response, flush, closeChannel);
}
-
});
storageManager.completeOperations();
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-18 02:52:31 UTC (rev 8303)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-18 04:19:10 UTC (rev 8304)
@@ -19,6 +19,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
@@ -219,10 +220,16 @@
// We use the Callback even for non persistence
// If we are using non-persistence with replication, the replication manager will have
// to execute this runnable in the correct order
- storageManager.afterCompleteOperations(new Runnable()
+ storageManager.afterCompleteOperations(new IOCompletion()
{
- public void run()
+
+ public void onError(int errorCode, String errorMessage)
{
+ log.warn("IO Error completing the transaction, code = " + errorCode + ", message = " + errorMessage);
+ }
+
+ public void done()
+ {
for (TransactionOperation operation : operations)
{
try
@@ -238,6 +245,7 @@
}
}
});
+
}
}
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-18 02:52:31 UTC (rev 8303)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-18 04:19:10 UTC (rev 8304)
@@ -38,6 +38,7 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
@@ -375,10 +376,15 @@
}
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getContext().executeOnCompletion(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new IOCompletion()
{
- public void run()
+
+ public void onError(int errorCode, String errorMessage)
{
+ }
+
+ public void done()
+ {
latch.countDown();
}
});
@@ -402,14 +408,17 @@
private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getContext().executeOnCompletion(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new IOCompletion()
{
- public void run()
+ public void onError(int errorCode, String errorMessage)
{
+ }
+
+ public void done()
+ {
latch.countDown();
}
-
});
manager.closeContext();
@@ -458,14 +467,17 @@
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getContext().executeOnCompletion(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new IOCompletion()
{
- public void run()
+ public void onError(int errorCode, String errorMessage)
{
+ }
+
+ public void done()
+ {
latch.countDown();
}
-
});
manager.closeContext();
@@ -505,41 +517,41 @@
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
int numberOfAdds = 200;
-
+
final CountDownLatch latch = new CountDownLatch(numberOfAdds);
-
+
for (int i = 0; i < numberOfAdds; i++)
{
final int nAdd = i;
-
+
if (i % 2 == 0)
{
replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
}
-
- OperationContextImpl.getContext().executeOnCompletion(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new IOCompletion()
{
- public void run()
+ public void onError(int errorCode, String errorMessage)
{
- executions.add(nAdd);
+ }
+
+ public void done()
+ {
latch.countDown();
}
-
});
manager.closeContext();
}
-
+
assertTrue(latch.await(10, TimeUnit.SECONDS));
-
for (int i = 0; i < numberOfAdds; i++)
{
assertEquals(i, executions.get(i).intValue());
}
-
+
assertEquals(0, manager.getActiveTokens().size());
manager.stop();
}
@@ -868,5 +880,80 @@
return 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[], boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendAddRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion callback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[], boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendUpdateRecord(long id,
+ byte recordType,
+ byte[] record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendUpdateRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ }
+
}
}
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-18 02:52:31 UTC (rev 8303)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-18 04:19:10 UTC (rev 8304)
@@ -29,6 +29,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -1232,6 +1233,15 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
+ */
+ public void afterCompleteOperations(IOCompletion run)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
15 years, 1 month
JBoss hornetq SVN: r8303 - in branches/ClebertTemporary: src/main/org/hornetq/core/completion and 16 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-17 21:52:31 -0500 (Tue, 17 Nov 2009)
New Revision: 8303
Added:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
Removed:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java
Modified:
branches/ClebertTemporary/.classpath
branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/Journal.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Refactoring II - won't commit at this point (some stuff pending and I wanted to save this checkpoint)
Modified: branches/ClebertTemporary/.classpath
===================================================================
--- branches/ClebertTemporary/.classpath 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/.classpath 2009-11-18 02:52:31 UTC (rev 8303)
@@ -7,7 +7,7 @@
<classpathentry kind="src" path="tests/config"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src">
<attributes>
- <attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="trunk/native/bin"/>
+ <attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="trunk-tmp/native/bin"/>
</attributes>
</classpathentry>
<classpathentry kind="src" path="tests/jms-tests/src"/>
Deleted: branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -1,51 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.completion;
-
-
-/**
- * This represents a set of operations done as part of replication.
- * When the entire set is done a group of Runnables can be executed.
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface CompletionContext
-{
- /** To be called by the replication manager, when new replication is added to the queue */
- void linedUp();
-
- boolean hasData();
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- void replicated();
-
- void afterCompletion(Runnable runnable);
-
- /** To be called when there are no more operations pending */
- void complete();
-
- /** Flush all pending callbacks on the Context */
- void flush();
-
- /** Replication may need some extra controls to guarantee ordering
- * when nothing is persisted through the contexts
- * @return The context is empty
- */
- boolean isEmpty();
-
- void setEmpty(boolean empty);
-
-}
Copied: branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java (from rev 8302, branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion;
+
+import org.hornetq.core.journal.IOCompletion;
+
+
+/**
+ * This represents a set of operations done as part of replication.
+ * When the entire set is done a group of Runnables can be executed.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface OperationContext extends IOCompletion
+{
+ /** To be called by the replication manager, when new replication is added to the queue */
+ void linedUp();
+
+ boolean hasData();
+
+ void executeOnCompletion(IOCompletion runnable);
+
+ /** To be called when there are no more operations pending */
+ void complete();
+
+ /** Flush all pending callbacks on the Context */
+ void flush();
+
+ /** Replication may need some extra controls to guarantee ordering
+ * when nothing is persisted through the contexts
+ * @return The context is empty
+ */
+ boolean isEmpty();
+
+ void setEmpty(boolean empty);
+
+}
Added: branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion;
+
+/**
+ * A OperationExceptionCallback
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface OperationExceptionCallback
+{
+ void onError(int errorCode, String errorMessage);
+}
Deleted: branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -1,139 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.completion.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.hornetq.core.completion.CompletionContext;
-
-/**
- * A ReplicationToken
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- * TODO: Maybe I should move this to persistence.journal. I need to check a few dependencies first.
- *
- */
-public class CompletionContextImpl implements CompletionContext
-{
- private static final ThreadLocal<CompletionContext> tlReplicationContext = new ThreadLocal<CompletionContext>();
-
- public static CompletionContext getContext()
- {
- CompletionContext token = tlReplicationContext.get();
- if (token == null)
- {
- token = new CompletionContextImpl();
- tlReplicationContext.set(token);
- }
- return token;
- }
-
- private List<Runnable> tasks;
-
- private int linedup = 0;
-
- private int replicated = 0;
-
- private boolean empty = false;
-
- private volatile boolean complete = false;
-
- /**
- * @param executor
- */
- public CompletionContextImpl()
- {
- super();
- }
-
- /** To be called by the replication manager, when new replication is added to the queue */
- public void linedUp()
- {
- linedup++;
- }
-
- public boolean hasData()
- {
- return linedup > 0;
- }
-
- /** You may have several actions to be done after a replication operation is completed. */
- public void afterCompletion(Runnable runnable)
- {
- if (complete)
- {
- // Sanity check, this shouldn't happen
- throw new IllegalStateException("The Replication Context is complete, and no more tasks are accepted");
- }
-
- if (tasks == null)
- {
- // No need to use Concurrent, we only add from a single thread.
- // We don't add any more Runnables after it is complete
- tasks = new ArrayList<Runnable>();
- }
-
- tasks.add(runnable);
- }
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- public synchronized void replicated()
- {
- if (++replicated == linedup && complete)
- {
- flush();
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationToken#complete()
- */
- public synchronized void complete()
- {
- tlReplicationContext.set(null);
- complete = true;
- if (replicated == linedup && complete)
- {
- flush();
- }
- }
-
- public synchronized void flush()
- {
- if (tasks != null)
- {
- for (Runnable run : tasks)
- {
- run.run();
- }
- tasks.clear();
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
- */
- public boolean isEmpty()
- {
- return empty;
- }
-
- public void setEmpty(final boolean sync)
- {
- this.empty = sync;
- }
-
-}
Copied: branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java (from rev 8302, branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.hornetq.core.completion.OperationContext;
+import org.hornetq.core.journal.IOCompletion;
+
+/**
+ * A ReplicationToken
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ * TODO: Maybe I should move this to persistence.journal. I need to check a few dependencies first.
+ *
+ */
+public class OperationContextImpl implements OperationContext
+{
+ private static final ThreadLocal<OperationContext> tlContext = new ThreadLocal<OperationContext>();
+
+ public static OperationContext getContext()
+ {
+ OperationContext token = tlContext.get();
+ if (token == null)
+ {
+ token = new OperationContextImpl();
+ tlContext.set(token);
+ }
+ return token;
+ }
+
+ private List<IOCompletion> tasks;
+
+ private int linedup = 0;
+
+ private int replicated = 0;
+
+ private boolean empty = false;
+
+ private volatile boolean complete = false;
+
+ /**
+ * @param executor
+ */
+ public OperationContextImpl()
+ {
+ super();
+ }
+
+ /** To be called by the replication manager, when new replication is added to the queue */
+ public void linedUp()
+ {
+ linedup++;
+ }
+
+ public boolean hasData()
+ {
+ return linedup > 0;
+ }
+
+ /** You may have several actions to be done after a replication operation is completed. */
+ public void executeOnCompletion(IOCompletion completion)
+ {
+ if (complete)
+ {
+ // Sanity check, this shouldn't happen
+ throw new IllegalStateException("The Replication Context is complete, and no more tasks are accepted");
+ }
+
+ if (tasks == null)
+ {
+ // No need to use Concurrent, we only add from a single thread.
+ // We don't add any more Runnables after it is complete
+ tasks = new LinkedList<IOCompletion>();
+ }
+
+ tasks.add(completion);
+ }
+
+ /** To be called by the replication manager, when data is confirmed on the channel */
+ public synchronized void done()
+ {
+ if (++replicated == linedup && complete)
+ {
+ flush();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationToken#complete()
+ */
+ public synchronized void complete()
+ {
+ tlContext.set(null);
+ complete = true;
+ if (replicated == linedup && complete)
+ {
+ flush();
+ }
+ }
+
+ public synchronized void flush()
+ {
+ if (tasks != null)
+ {
+ for (IOCompletion run : tasks)
+ {
+ run.done();
+ }
+ tasks.clear();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
+ */
+ public boolean isEmpty()
+ {
+ return empty;
+ }
+
+ public void setEmpty(final boolean sync)
+ {
+ this.empty = sync;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
+ */
+ public void onError(int errorCode, String errorMessage)
+ {
+ if (tasks != null)
+ {
+ for (IOCompletion run : tasks)
+ {
+ run.onError(errorCode, errorMessage);
+ }
+ }
+ }
+
+}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/IOCompletion.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -25,5 +25,4 @@
*/
public interface IOCompletion extends AIOCallback
{
- void waitCompletion() throws Exception;
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/Journal.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/Journal.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -19,8 +19,10 @@
/**
*
- * A Journal
+ * Most methods on the journal provide a blocking version where you select the sync mode and a non blocking mode where you pass a completion callback as a parameter.
*
+ * Notice also that even on the callback methods it's possible to pass the sync mode. That will only make sense on the NIO operations.
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
@@ -31,14 +33,24 @@
void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
+ void appendAddRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback) throws Exception;
+
void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+ void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion completionCallback) throws Exception;
+
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
+ void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback) throws Exception;
+
void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+ void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion completionCallback) throws Exception;
+
void appendDeleteRecord(long id, boolean sync) throws Exception;
+ void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception;
+
// Transactional operations
void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
@@ -57,6 +69,8 @@
void appendCommitRecord(long txID, boolean sync) throws Exception;
+ void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception;
+
/**
*
* <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
@@ -70,10 +84,16 @@
*/
void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception;
+ void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback) throws Exception;
+
void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception;
+ void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion callback) throws Exception;
+
void appendRollbackRecord(long txID, boolean sync) throws Exception;
+ void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception;
+
// Load
JournalLoadInformation load(LoaderCallback reloadManager) throws Exception;
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -229,7 +229,7 @@
public int read(final ByteBuffer bytes) throws Exception
{
- IOCompletion waitCompletion = SimpleWaitIOCallback.getInstance();
+ SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback();
int bytesRead = read(bytes, waitCompletion);
@@ -281,7 +281,7 @@
{
if (sync)
{
- IOCompletion completion = SimpleWaitIOCallback.getInstance();
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
writeDirect(bytes, true, completion);
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -178,7 +178,7 @@
{
if (sync)
{
- IOCompletion completion = SimpleWaitIOCallback.getInstance();
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
write(bytes, true, completion);
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -14,7 +14,6 @@
package org.hornetq.core.journal.impl;
-import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
/**
@@ -24,13 +23,13 @@
*
*
*/
-public class DummyCallback implements IOCompletion
+class DummyCallback extends SyncIOCompletion
{
private static DummyCallback instance = new DummyCallback();
private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
- public static IOCompletion getInstance()
+ public static DummyCallback getInstance()
{
return instance;
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -845,15 +845,31 @@
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
+ public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync, final IOCompletion callback) throws Exception
+ {
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
+ }
+
public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
{
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendAddRecord(id, recordType, record, sync, callback);
+
+ // We only wait on explicit callbacks
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ }
+
+ public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+ {
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
- IOCompletion callback = null;
-
compactingLock.readLock().lock();
try
@@ -864,8 +880,6 @@
writeAddRecord(-1, id, recordType, record, size, bb); // fileID will be filled later
- callback = getSyncCallback(sync);
-
lockAppend.lock();
try
{
@@ -882,11 +896,6 @@
{
compactingLock.readLock().unlock();
}
-
- if (callback != null)
- {
- callback.waitCompletion();
- }
}
public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
@@ -894,15 +903,31 @@
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
+ public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync, final IOCompletion callback) throws Exception
+ {
+ appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
+ }
+
public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
{
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendUpdateRecord(id, recordType, record, sync, callback);
+
+ // We only wait on explicit callbacks
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ }
+
+ public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+ {
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
- IOCompletion callback = null;
-
compactingLock.readLock().lock();
try
@@ -924,8 +949,6 @@
writeUpdateRecord(-1, id, recordType, record, size, bb);
- callback = getSyncCallback(sync);
-
lockAppend.lock();
try
{
@@ -951,14 +974,23 @@
{
compactingLock.readLock().unlock();
}
+ }
+
+ public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+ {
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendDeleteRecord(id, sync, callback);
+
+ // We only wait on explicit callbacks
if (callback != null)
{
callback.waitCompletion();
}
}
-
- public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+
+ public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception
{
if (state != STATE_LOADED)
{
@@ -967,8 +999,6 @@
compactingLock.readLock().lock();
- IOCompletion callback = null;
-
try
{
@@ -988,8 +1018,6 @@
writeDeleteRecord(-1, id, size, bb);
- callback = getSyncCallback(sync);
-
lockAppend.lock();
try
{
@@ -1016,11 +1044,6 @@
{
compactingLock.readLock().unlock();
}
-
- if (callback != null)
- {
- callback.waitCompletion();
- }
}
public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
@@ -1165,6 +1188,12 @@
{
appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
}
+
+
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion completion) throws Exception
+ {
+ appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync, completion);
+ }
/* (non-Javadoc)
* @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
@@ -1174,6 +1203,18 @@
appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
}
+ public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendPrepareRecord(txID, transactionData, sync, syncCompletion);
+
+ if (syncCompletion != null)
+ {
+ syncCompletion.waitCompletion();
+ }
+ }
+
/**
*
* <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
@@ -1187,7 +1228,7 @@
* @param transactionData - extra user data for the prepare
* @throws Exception
*/
- public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
+ public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync, IOCompletion completion) throws Exception
{
if (state != STATE_LOADED)
{
@@ -1198,11 +1239,6 @@
JournalTransaction tx = getTransactionInfo(txID);
- if (sync)
- {
- tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
- }
-
try
{
@@ -1214,7 +1250,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, true, sync, tx, completion);
tx.prepare(usedFile);
}
@@ -1228,11 +1264,23 @@
{
compactingLock.readLock().unlock();
}
-
- // We should wait this outside of the lock, to increase throughput
- tx.waitCompletion();
}
+
+
+
+ public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendCommitRecord(txID, sync, syncCompletion);
+
+ if (syncCompletion != null)
+ {
+ syncCompletion.waitCompletion();
+ }
+ }
+
/**
* <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
* <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
@@ -1250,7 +1298,9 @@
*
* @see JournalImpl#writeTransaction(byte, long, org.hornetq.core.journal.impl.JournalImpl.JournalTransaction, EncodingSupport)
*/
- public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+
+
+ public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
{
if (state != STATE_LOADED)
{
@@ -1282,7 +1332,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
tx.commit(usedFile);
}
@@ -1296,15 +1346,23 @@
{
compactingLock.readLock().unlock();
}
+ }
- if (sync)
+
+ public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendRollbackRecord(txID, sync, syncCompletion);
+
+ if (syncCompletion != null)
{
- // We should wait this outside of the lock, to increase throuput
- tx.waitCompletion();
+ syncCompletion.waitCompletion();
}
+
}
-
- public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
+
+ public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion completion) throws Exception
{
if (state != STATE_LOADED)
{
@@ -1331,7 +1389,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, false, sync, tx, completion);
tx.rollback(usedFile);
}
@@ -1345,14 +1403,6 @@
{
compactingLock.readLock().unlock();
}
-
- // We should wait this outside of the lock, to increase throuput
-
- if (sync)
- {
- tx.waitCompletion();
- }
-
}
public int getAlignment() throws Exception
@@ -2833,7 +2883,7 @@
final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
- IOCompletion callback) throws Exception
+ final IOCompletion parameterCallback) throws Exception
{
try
{
@@ -2841,6 +2891,8 @@
{
throw new IllegalStateException("The journal is not loaded " + state);
}
+
+ final IOCompletion callback;
int size = bb.capacity();
@@ -2874,25 +2926,26 @@
if (tx != null)
{
- if (callback != null)
- {
- // sanity check, it should not happen.
- throw new IllegalArgumentException("Invalid callback parameter. Use of tx is mutually exclusive with the callback");
- }
-
// The callback of a transaction has to be taken inside the lock,
// when we guarantee the currentFile will not be changed,
// since we individualize the callback per file
if (fileFactory.isSupportsCallbacks())
{
- callback = tx.getCallback(currentFile);
+ // Set the delegated callback as a parameter
+ TransactionCallback txcallback = tx.getCallback(currentFile);
+ txcallback.setDelegateCompletion(parameterCallback);
+ callback = txcallback;
}
+ else
+ {
+ callback = null;
+ }
if (sync)
{
- // 99 % of the times this will be already synced, as previous files should be closed already.
- // This is to have 100% guarantee the transaction will be persisted and no loss of information would
- // happen
+ // In an edge case the transaction could still have pending data from previous files.
+ // This shouldn't cause any blocking issues, as this is here to guarantee we cover all possibilities
+ // on guaranteeing the data is on the disk
tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
}
@@ -2903,6 +2956,10 @@
tx.fillNumberOfRecords(currentFile, bb);
}
}
+ else
+ {
+ callback = parameterCallback;
+ }
// Adding fileID
bb.writerIndex(DataConstants.SIZE_BYTE);
@@ -3233,13 +3290,13 @@
return tx;
}
- private IOCompletion getSyncCallback(final boolean sync)
+ private SyncIOCompletion getSyncCallback(final boolean sync)
{
if (fileFactory.isSupportsCallbacks())
{
if (sync)
{
- return SimpleWaitIOCallback.getInstance();
+ return new SimpleWaitIOCallback();
}
else
{
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -14,9 +14,9 @@
package org.hornetq.core.journal.impl;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
/**
@@ -26,7 +26,7 @@
*
*
*/
-public class SimpleWaitIOCallback implements IOCompletion
+public class SimpleWaitIOCallback extends SyncIOCompletion
{
private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
@@ -37,12 +37,6 @@
private volatile int errorCode = 0;
- public static IOCompletion getInstance()
- {
- return new SimpleWaitIOCallback();
- }
-
-
public void done()
{
latch.countDown();
@@ -68,4 +62,9 @@
}
return;
}
+
+ public boolean waitCompletion(final long timeout) throws Exception
+ {
+ return latch.await(timeout, TimeUnit.MILLISECONDS);
+ }
}
Added: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import org.hornetq.core.journal.IOCompletion;
+
+/**
+ * Internal class used to manage explicit syncs on the Journal through callbacks.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class SyncIOCompletion implements IOCompletion
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public abstract void waitCompletion() throws Exception;
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -31,15 +31,27 @@
private volatile String errorMessage = null;
private volatile int errorCode = 0;
+
+ private volatile int up = 0;
+
+ private volatile int done = 0;
+
+ private volatile IOCompletion delegateCompletion;
public void countUp()
{
+ up++;
countLatch.up();
}
public void done()
{
countLatch.down();
+ if (++done == up && delegateCompletion != null)
+ {
+ delegateCompletion.done();
+ delegateCompletion = null;
+ }
}
public void waitCompletion() throws InterruptedException
@@ -59,9 +71,30 @@
this.errorCode = errorCode;
countLatch.down();
+
+ if (delegateCompletion != null)
+ {
+ delegateCompletion.onError(errorCode, errorMessage);
+ }
}
/**
+ * @return the delegateCompletion
+ */
+ public IOCompletion getDelegateCompletion()
+ {
+ return delegateCompletion;
+ }
+
+ /**
+ * @param delegateCompletion the delegateCompletion to set
+ */
+ public void setDelegateCompletion(IOCompletion delegateCompletion)
+ {
+ this.delegateCompletion = delegateCompletion;
+ }
+
+ /**
* @return the errorMessage
*/
public String getErrorMessage()
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -997,8 +997,7 @@
depageTransaction.commit();
- // StorageManager does the check: if (replicated) -> do the proper cleanup already
- storageManager.completeReplication();
+ storageManager.completeOperations();
if (isTrace)
{
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -18,6 +18,7 @@
import javax.transaction.xa.Xid;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -56,13 +57,14 @@
boolean isReplicated();
- void afterCompletion(Runnable run);
+ void afterCompleteOperations(IOCompletion run);
/** Block until the replication is done.
* @throws Exception */
- void waitOnReplication(long timeout) throws Exception;
+ void waitOnOperations(long timeout) throws Exception;
- void completeReplication();
+ /** To close the OperationsContext */
+ void completeOperations();
UUID getPersistentID();
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -33,11 +33,12 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.completion.impl.CompletionContextImpl;
+import org.hornetq.core.completion.impl.OperationContextImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -48,6 +49,7 @@
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.SimpleWaitIOCallback;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PageTransactionInfo;
@@ -292,7 +294,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#completeReplication()
*/
- public void completeReplication()
+ public void completeOperations()
{
if (replicator != null)
{
@@ -308,19 +310,12 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
*/
- public void waitOnReplication(final long timeout) throws Exception
+ public void waitOnOperations(final long timeout) throws Exception
{
- final CountDownLatch latch = new CountDownLatch(1);
- afterCompletion(new Runnable()
+ SimpleWaitIOCallback waitCallback = new SimpleWaitIOCallback();
+ afterCompleteOperations(waitCallback);
+ if (!waitCallback.waitCompletion(timeout))
{
- public void run()
- {
- latch.countDown();
- }
- });
- completeReplication();
- if (!latch.await(timeout, TimeUnit.MILLISECONDS))
- {
throw new IllegalStateException("no response received from replication");
}
}
@@ -364,9 +359,9 @@
// TODO: shouldn't those page methods be on the PageManager? ^^^^
- public void afterCompletion(Runnable run)
+ public void afterCompleteOperations(IOCompletion run)
{
- CompletionContextImpl.getContext().afterCompletion(run);
+ OperationContextImpl.getContext().executeOnCompletion(run);
}
public UUID getPersistentID()
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -20,6 +20,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -282,14 +283,6 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
- */
- public void afterCompletion(Runnable run)
- {
- run.run();
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#isReplicated()
*/
public boolean isReplicated()
@@ -300,7 +293,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#completeReplication()
*/
- public void completeReplication()
+ public void completeOperations()
{
}
@@ -336,7 +329,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
*/
- public void waitOnReplication(long timeout) throws Exception
+ public void waitOnOperations(long timeout) throws Exception
{
}
@@ -348,4 +341,12 @@
throw new IllegalStateException("Null Persistence should never be used as replicated");
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
+ */
+ public void afterCompleteOperations(IOCompletion run)
+ {
+ run.done();
+ }
+
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -924,20 +924,13 @@
}
else
{
- if (storageManager.isReplicated())
+ storageManager.afterCompleteOperations(new Runnable()
{
- storageManager.afterCompletion(new Runnable()
+ public void run()
{
- public void run()
- {
- addReferences(refs);
- }
- });
- }
- else
- {
- addReferences(refs);
- }
+ addReferences(refs);
+ }
+ });
}
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -15,7 +15,7 @@
import java.util.Set;
-import org.hornetq.core.completion.CompletionContext;
+import org.hornetq.core.completion.OperationContext;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -53,7 +53,7 @@
void closeContext();
/** A list of tokens that are still waiting for replications to be completed */
- Set<CompletionContext> getActiveTokens();
+ Set<OperationContext> getActiveTokens();
/**
* @param storeName
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -22,8 +22,8 @@
import org.hornetq.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.FailoverManager;
-import org.hornetq.core.completion.CompletionContext;
-import org.hornetq.core.completion.impl.CompletionContextImpl;
+import org.hornetq.core.completion.OperationContext;
+import org.hornetq.core.completion.impl.OperationContextImpl;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -82,7 +82,7 @@
private final Object replicationLock = new Object();
- private final Queue<CompletionContext> pendingTokens = new ConcurrentLinkedQueue<CompletionContext>();
+ private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<OperationContext>();
// Static --------------------------------------------------------
@@ -359,17 +359,17 @@
{
enabled = false;
- LinkedHashSet<CompletionContext> activeContexts = new LinkedHashSet<CompletionContext>();
+ LinkedHashSet<OperationContext> activeContexts = new LinkedHashSet<OperationContext>();
// The same context will be replicated on the pending tokens...
// as the multiple operations will be replicated on the same context
while (!pendingTokens.isEmpty())
{
- CompletionContext ctx = pendingTokens.poll();
+ OperationContext ctx = pendingTokens.poll();
activeContexts.add(ctx);
}
- for (CompletionContext ctx : activeContexts)
+ for (OperationContext ctx : activeContexts)
{
ctx.complete();
ctx.flush();
@@ -397,7 +397,7 @@
*/
public void closeContext()
{
- final CompletionContext token = getContext();
+ final OperationContext token = getContext();
if (token != null)
{
@@ -414,15 +414,15 @@
/* method for testcases only
* @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
*/
- public Set<CompletionContext> getActiveTokens()
+ public Set<OperationContext> getActiveTokens()
{
- LinkedHashSet<CompletionContext> activeContexts = new LinkedHashSet<CompletionContext>();
+ LinkedHashSet<OperationContext> activeContexts = new LinkedHashSet<OperationContext>();
// The same context will be replicated on the pending tokens...
// as the multiple operations will be replicated on the same context
- for (CompletionContext ctx : pendingTokens)
+ for (OperationContext ctx : pendingTokens)
{
activeContexts.add(ctx);
}
@@ -435,7 +435,7 @@
{
boolean runItNow = false;
- CompletionContext repliToken = getContext();
+ OperationContext repliToken = getContext();
repliToken.linedUp();
synchronized (replicationLock)
@@ -458,7 +458,7 @@
if (runItNow)
{
- repliToken.replicated();
+ repliToken.done();
}
}
@@ -472,11 +472,11 @@
private void replicated()
{
- List<CompletionContext> tokensToExecute = getTokens();
+ List<OperationContext> tokensToExecute = getTokens();
- for (CompletionContext ctx : tokensToExecute)
+ for (OperationContext ctx : tokensToExecute)
{
- ctx.replicated();
+ ctx.done();
}
}
@@ -486,7 +486,7 @@
// Private -------------------------------------------------------
- private void sync(CompletionContext context)
+ private void sync(OperationContext context)
{
boolean executeNow = false;
synchronized (replicationLock)
@@ -507,14 +507,14 @@
}
if (executeNow)
{
- context.replicated();
+ context.done();
}
}
- public CompletionContext getContext()
+ public OperationContext getContext()
{
- return CompletionContextImpl.getContext();
+ return OperationContextImpl.getContext();
}
/**
@@ -523,11 +523,11 @@
* At last, if the list is empty, it will verify if there are any future tokens that are sync tokens, to avoid a case where no more replication is done due to inactivity.
* @return
*/
- private List<CompletionContext> getTokens()
+ private List<OperationContext> getTokens()
{
- List<CompletionContext> retList = new LinkedList<CompletionContext>();
+ List<OperationContext> retList = new LinkedList<OperationContext>();
- CompletionContext tokenPolled = null;
+ OperationContext tokenPolled = null;
// First will get all the non replicated tokens up to the first one that is not replicated
do
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -144,21 +144,14 @@
tx.commit();
- if (storageManager.isReplicated())
+ storageManager.afterCompleteOperations(new Runnable()
{
- storageManager.afterCompletion(new Runnable()
+ public void run()
{
- public void run()
- {
- execPrompter();
- }
- });
- storageManager.completeReplication();
- }
- else
- {
- execPrompter();
- }
+ execPrompter();
+ }
+ });
+ storageManager.completeOperations();
}
private void execPrompter()
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -87,10 +87,7 @@
}
newList.add(groupBinding);
storageManager.addGrouping(groupBinding);
- if (storageManager.isReplicated())
- {
- storageManager.waitOnReplication(timeout);
- }
+ storageManager.waitOnOperations(timeout);
return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
}
else
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -1718,23 +1718,16 @@
final boolean flush,
final boolean closeChannel)
{
- if (storageManager.isReplicated())
+ storageManager.afterCompleteOperations(new Runnable()
{
- storageManager.afterCompletion(new Runnable()
+ public void run()
{
- public void run()
- {
- doSendResponse(confirmPacket, response, flush, closeChannel);
- }
+ doSendResponse(confirmPacket, response, flush, closeChannel);
+ }
- });
-
- storageManager.completeReplication();
- }
- else
- {
- doSendResponse(confirmPacket, response, flush, closeChannel);
- }
+ });
+
+ storageManager.completeOperations();
}
/**
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -98,7 +98,7 @@
{
this.containsPersistent = true;
}
-
+
public long getID()
{
return id;
@@ -209,53 +209,35 @@
}
}
- Runnable execAfterCommit = null;
-
- if (operations != null)
- {
- execAfterCommit = new Runnable()
- {
- public void run()
- {
- for (TransactionOperation operation : operations)
- {
- try
- {
- operation.afterCommit(TransactionImpl.this);
- }
- catch (Exception e)
- {
- // https://jira.jboss.org/jira/browse/HORNETQ-188
- // After commit shouldn't throw an exception
- log.warn(e.getMessage(), e);
- }
- }
- }
- };
- }
-
if (containsPersistent || (xid != null && state == State.PREPARED))
{
storageManager.commit(id);
state = State.COMMITTED;
+ }
- if (execAfterCommit != null)
+ // We use the Callback even for non persistence
+ // If we are using non-persistence with replication, the replication manager will have
+ // to execute this runnable in the correct order
+ storageManager.afterCompleteOperations(new Runnable()
+ {
+ public void run()
{
- if (storageManager.isReplicated())
+ for (TransactionOperation operation : operations)
{
- storageManager.afterCompletion(execAfterCommit);
+ try
+ {
+ operation.afterCommit(TransactionImpl.this);
+ }
+ catch (Exception e)
+ {
+ // https://jira.jboss.org/jira/browse/HORNETQ-188
+ // After commit shouldn't throw an exception
+ log.warn(e.getMessage(), e);
+ }
}
- else
- {
- execAfterCommit.run();
- }
}
- }
- else if (execAfterCommit != null)
- {
- execAfterCommit.run();
- }
+ });
}
}
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -92,7 +92,7 @@
}
session.createQueue(address, queue, true);
ClientProducer producer = session.createProducer(address);
- boolean durable = true;
+ boolean durable = false;
for (int i = 0; i < NUM; i++)
{
ClientMessage msg = session.createClientMessage(durable);
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -32,7 +32,7 @@
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.FailoverManager;
import org.hornetq.core.client.impl.FailoverManagerImpl;
-import org.hornetq.core.completion.impl.CompletionContextImpl;
+import org.hornetq.core.completion.impl.OperationContextImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -375,7 +375,7 @@
}
final CountDownLatch latch = new CountDownLatch(1);
- CompletionContextImpl.getContext().afterCompletion(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new Runnable()
{
public void run()
{
@@ -402,7 +402,7 @@
private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
- CompletionContextImpl.getContext().afterCompletion(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new Runnable()
{
public void run()
@@ -458,7 +458,7 @@
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
final CountDownLatch latch = new CountDownLatch(1);
- CompletionContextImpl.getContext().afterCompletion(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new Runnable()
{
public void run()
@@ -468,22 +468,10 @@
});
- assertEquals(1, manager.getActiveTokens().size());
-
manager.closeContext();
assertTrue(latch.await(1, TimeUnit.SECONDS));
- for (int i = 0; i < 100; i++)
- {
- // This is asynchronous. Have to wait completion
- if (manager.getActiveTokens().size() == 0)
- {
- break;
- }
- Thread.sleep(1);
- }
-
assertEquals(0, manager.getActiveTokens().size());
manager.stop();
}
@@ -530,7 +518,7 @@
}
- CompletionContextImpl.getContext().afterCompletion(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new Runnable()
{
public void run()
@@ -552,17 +540,6 @@
assertEquals(i, executions.get(i).intValue());
}
- for (int i = 0; i < 100; i++)
- {
- // This is asynchronous. Have to wait completion
- if (manager.getActiveTokens().size() == 0)
- {
- break;
- }
- Thread.sleep(1);
- }
-
-
assertEquals(0, manager.getActiveTokens().size());
manager.stop();
}
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-17 21:40:56 UTC (rev 8302)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-18 02:52:31 UTC (rev 8303)
@@ -1155,7 +1155,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
*/
- public void afterCompletion(Runnable run)
+ public void afterCompleteOperations(Runnable run)
{
}
@@ -1163,7 +1163,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#completeReplication()
*/
- public void completeReplication()
+ public void completeOperations()
{
}
@@ -1221,7 +1221,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
*/
- public void waitOnReplication(long timeout) throws Exception
+ public void waitOnOperations(long timeout) throws Exception
{
}
15 years, 1 month
JBoss hornetq SVN: r8302 - in branches/ClebertTemporary: src/main/org/hornetq/core/completion and 12 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-17 16:40:56 -0500 (Tue, 17 Nov 2009)
New Revision: 8302
Added:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/
branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java
Removed:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
another iteration
Copied: branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java (from rev 8301, branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/CompletionContext.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion;
+
+
+/**
+ * This represents a set of operations done as part of replication.
+ * When the entire set is done a group of Runnables can be executed.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface CompletionContext
+{
+ /** To be called by the replication manager, when new replication is added to the queue */
+ void linedUp();
+
+ boolean hasData();
+
+ /** To be called by the replication manager, when data is confirmed on the channel */
+ void replicated();
+
+ void afterCompletion(Runnable runnable);
+
+ /** To be called when there are no more operations pending */
+ void complete();
+
+ /** Flush all pending callbacks on the Context */
+ void flush();
+
+ /** Replication may need some extra controls to guarantee ordering
+ * when nothing is persisted through the contexts
+ * @return The context is empty
+ */
+ boolean isEmpty();
+
+ void setEmpty(boolean empty);
+
+}
Copied: branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java (from rev 8301, branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java (rev 0)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/CompletionContextImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.completion.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.core.completion.CompletionContext;
+
+/**
+ * A ReplicationToken
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ * TODO: Maybe I should move this to persistence.journal. I need to check a few dependencies first.
+ *
+ */
+public class CompletionContextImpl implements CompletionContext
+{
+ private static final ThreadLocal<CompletionContext> tlReplicationContext = new ThreadLocal<CompletionContext>();
+
+ public static CompletionContext getContext()
+ {
+ CompletionContext token = tlReplicationContext.get();
+ if (token == null)
+ {
+ token = new CompletionContextImpl();
+ tlReplicationContext.set(token);
+ }
+ return token;
+ }
+
+ private List<Runnable> tasks;
+
+ private int linedup = 0;
+
+ private int replicated = 0;
+
+ private boolean empty = false;
+
+ private volatile boolean complete = false;
+
+ /**
+ * @param executor
+ */
+ public CompletionContextImpl()
+ {
+ super();
+ }
+
+ /** To be called by the replication manager, when new replication is added to the queue */
+ public void linedUp()
+ {
+ linedup++;
+ }
+
+ public boolean hasData()
+ {
+ return linedup > 0;
+ }
+
+ /** You may have several actions to be done after a replication operation is completed. */
+ public void afterCompletion(Runnable runnable)
+ {
+ if (complete)
+ {
+ // Sanity check, this shouldn't happen
+ throw new IllegalStateException("The Replication Context is complete, and no more tasks are accepted");
+ }
+
+ if (tasks == null)
+ {
+ // No need to use Concurrent, we only add from a single thread.
+ // We don't add any more Runnables after it is complete
+ tasks = new ArrayList<Runnable>();
+ }
+
+ tasks.add(runnable);
+ }
+
+ /** To be called by the replication manager, when data is confirmed on the channel */
+ public synchronized void replicated()
+ {
+ if (++replicated == linedup && complete)
+ {
+ flush();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationToken#complete()
+ */
+ public synchronized void complete()
+ {
+ tlReplicationContext.set(null);
+ complete = true;
+ if (replicated == linedup && complete)
+ {
+ flush();
+ }
+ }
+
+ public synchronized void flush()
+ {
+ if (tasks != null)
+ {
+ for (Runnable run : tasks)
+ {
+ run.run();
+ }
+ tasks.clear();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
+ */
+ public boolean isEmpty()
+ {
+ return empty;
+ }
+
+ public void setEmpty(final boolean sync)
+ {
+ this.empty = sync;
+ }
+
+}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -56,7 +56,7 @@
boolean isReplicated();
- void afterReplicated(Runnable run);
+ void afterCompletion(Runnable run);
/** Block until the replication is done.
* @throws Exception */
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -33,6 +33,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.completion.impl.CompletionContextImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
@@ -310,7 +311,7 @@
public void waitOnReplication(final long timeout) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
- afterReplicated(new Runnable()
+ afterCompletion(new Runnable()
{
public void run()
{
@@ -363,13 +364,9 @@
// TODO: shouldn't those page methods be on the PageManager? ^^^^
- public void afterReplicated(Runnable run)
+ public void afterCompletion(Runnable run)
{
- if (replicator == null)
- {
- throw new IllegalStateException("StorageManager is not replicated");
- }
- replicator.afterReplicated(run);
+ CompletionContextImpl.getContext().afterCompletion(run);
}
public UUID getPersistentID()
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -284,7 +284,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
*/
- public void afterReplicated(Runnable run)
+ public void afterCompletion(Runnable run)
{
run.run();
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -926,7 +926,7 @@
{
if (storageManager.isReplicated())
{
- storageManager.afterReplicated(new Runnable()
+ storageManager.afterCompletion(new Runnable()
{
public void run()
{
Deleted: branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -1,47 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication;
-
-
-/**
- * This represents a set of operations done as part of replication.
- * When the entire set is done a group of Runnables can be executed.
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface ReplicationContext
-{
- /** To be called by the replication manager, when new replication is added to the queue */
- void linedUp();
-
- boolean hasReplication();
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- void replicated();
-
- void addReplicationAction(Runnable runnable);
-
- /** To be called when there are no more operations pending */
- void complete();
-
- /** Flush all pending callbacks on the Context */
- void flush();
-
- boolean isSync();
-
- void setSync(boolean sync);
-
-}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -15,6 +15,7 @@
import java.util.Set;
+import org.hornetq.core.completion.CompletionContext;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -49,13 +50,10 @@
void appendRollbackRecord(byte journalID, long txID) throws Exception;
- /** Add an action to be executed after the pending replications */
- void afterReplicated(Runnable runnable);
-
void closeContext();
/** A list of tokens that are still waiting for replications to be completed */
- Set<ReplicationContext> getActiveTokens();
+ Set<CompletionContext> getActiveTokens();
/**
* @param storeName
Deleted: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -1,126 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.replication.ReplicationContext;
-
-/**
- * A ReplicationToken
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicationContextImpl implements ReplicationContext
-{
- private List<Runnable> tasks;
-
- private int linedup = 0;
-
- private int replicated = 0;
-
- private boolean sync = false;
-
- private volatile boolean complete = false;
-
- /**
- * @param executor
- */
- public ReplicationContextImpl()
- {
- super();
- }
-
- /** To be called by the replication manager, when new replication is added to the queue */
- public void linedUp()
- {
- linedup++;
- }
-
- public boolean hasReplication()
- {
- return linedup > 0;
- }
-
- /** You may have several actions to be done after a replication operation is completed. */
- public void addReplicationAction(Runnable runnable)
- {
- if (complete)
- {
- // Sanity check, this shouldn't happen
- throw new IllegalStateException("The Replication Context is complete, and no more tasks are accepted");
- }
-
- if (tasks == null)
- {
- // No need to use Concurrent, we only add from a single thread.
- // We don't add any more Runnables after it is complete
- tasks = new ArrayList<Runnable>();
- }
-
- tasks.add(runnable);
- }
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- public synchronized void replicated()
- {
- // roundtrip packets won't have lined up packets
- if (++replicated == linedup && complete)
- {
- flush();
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationToken#complete()
- */
- public synchronized void complete()
- {
- complete = true;
- if (replicated == linedup && complete)
- {
- flush();
- }
- }
-
- public synchronized void flush()
- {
- if (tasks != null)
- {
- for (Runnable run : tasks)
- {
- run.run();
- }
- tasks.clear();
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
- */
- public boolean isSync()
- {
- return sync;
- }
-
- public void setSync(final boolean sync)
- {
- this.sync = sync;
- }
-
-}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -13,14 +13,17 @@
package org.hornetq.core.replication.impl;
-import java.util.ArrayList;
import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.hornetq.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.FailoverManager;
+import org.hornetq.core.completion.CompletionContext;
+import org.hornetq.core.completion.impl.CompletionContextImpl;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
@@ -45,9 +48,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.replication.ReplicationContext;
import org.hornetq.core.replication.ReplicationManager;
-import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.SimpleString;
/**
@@ -81,10 +82,8 @@
private final Object replicationLock = new Object();
- private final ThreadLocal<ReplicationContext> tlReplicationContext = new ThreadLocal<ReplicationContext>();
+ private final Queue<CompletionContext> pendingTokens = new ConcurrentLinkedQueue<CompletionContext>();
- private final Queue<ReplicationContext> pendingTokens = new ConcurrentLinkedQueue<ReplicationContext>();
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -360,17 +359,17 @@
{
enabled = false;
- LinkedHashSet<ReplicationContext> activeContexts = new LinkedHashSet<ReplicationContext>();
+ LinkedHashSet<CompletionContext> activeContexts = new LinkedHashSet<CompletionContext>();
// The same context will be replicated on the pending tokens...
// as the multiple operations will be replicated on the same context
while (!pendingTokens.isEmpty())
{
- ReplicationContext ctx = pendingTokens.poll();
+ CompletionContext ctx = pendingTokens.poll();
activeContexts.add(ctx);
}
- for (ReplicationContext ctx : activeContexts)
+ for (CompletionContext ctx : activeContexts)
{
ctx.complete();
ctx.flush();
@@ -393,38 +392,17 @@
started = false;
}
- public ReplicationContext getContext()
- {
- ReplicationContext token = tlReplicationContext.get();
- if (token == null)
- {
- token = new ReplicationContextImpl();
- tlReplicationContext.set(token);
- }
- return token;
- }
-
/* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#addReplicationAction(java.lang.Runnable)
- */
- public void afterReplicated(final Runnable runnable)
- {
- getContext().addReplicationAction(runnable);
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#completeToken()
*/
public void closeContext()
{
- final ReplicationContext token = tlReplicationContext.get();
+ final CompletionContext token = getContext();
if (token != null)
{
- // Disassociate thread local
- tlReplicationContext.set(null);
// Remove from pending tokens as soon as this is complete
- if (!token.hasReplication())
+ if (!token.hasData())
{
sync(token);
}
@@ -432,18 +410,19 @@
}
}
+
/* method for testcases only
* @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
*/
- public Set<ReplicationContext> getActiveTokens()
+ public Set<CompletionContext> getActiveTokens()
{
- LinkedHashSet<ReplicationContext> activeContexts = new LinkedHashSet<ReplicationContext>();
+ LinkedHashSet<CompletionContext> activeContexts = new LinkedHashSet<CompletionContext>();
// The same context will be replicated on the pending tokens...
// as the multiple operations will be replicated on the same context
- for (ReplicationContext ctx : pendingTokens)
+ for (CompletionContext ctx : pendingTokens)
{
activeContexts.add(ctx);
}
@@ -456,7 +435,7 @@
{
boolean runItNow = false;
- ReplicationContext repliToken = getContext();
+ CompletionContext repliToken = getContext();
repliToken.linedUp();
synchronized (replicationLock)
@@ -493,9 +472,9 @@
private void replicated()
{
- ArrayList<ReplicationContext> tokensToExecute = getTokens();
+ List<CompletionContext> tokensToExecute = getTokens();
- for (ReplicationContext ctx : tokensToExecute)
+ for (CompletionContext ctx : tokensToExecute)
{
ctx.replicated();
}
@@ -507,13 +486,13 @@
// Private -------------------------------------------------------
- private void sync(ReplicationContext context)
+ private void sync(CompletionContext context)
{
boolean executeNow = false;
synchronized (replicationLock)
{
context.linedUp();
- context.setSync(true);
+ context.setEmpty(true);
if (pendingTokens.isEmpty())
{
// this means the list is empty and we should process it now
@@ -532,6 +511,11 @@
}
}
+
+ public CompletionContext getContext()
+ {
+ return CompletionContextImpl.getContext();
+ }
/**
* This method will first get all the sync tokens (that won't go to the backup node)
@@ -539,11 +523,11 @@
* At last, if the list is empty, it will verify if there are any future tokens that are sync tokens, to avoid a case where no more replication is done due to inactivity.
* @return
*/
- private ArrayList<ReplicationContext> getTokens()
+ private List<CompletionContext> getTokens()
{
- ArrayList<ReplicationContext> retList = new ArrayList<ReplicationContext>(1);
+ List<CompletionContext> retList = new LinkedList<CompletionContext>();
- ReplicationContext tokenPolled = null;
+ CompletionContext tokenPolled = null;
// First will get all the non replicated tokens up to the first one that is not replicated
do
@@ -558,16 +542,16 @@
retList.add(tokenPolled);
}
- while (tokenPolled.isSync());
+ while (tokenPolled.isEmpty());
// This is to avoid a situation where we won't have more replicated packets
- // all the packets will need to be processed
+ // We need to make sure we process any pending sync packet up to the next non empty packet
synchronized (replicationLock)
{
- while (!pendingTokens.isEmpty() && pendingTokens.peek().isSync())
+ while (!pendingTokens.isEmpty() && pendingTokens.peek().isEmpty())
{
tokenPolled = pendingTokens.poll();
- if (!tokenPolled.isSync())
+ if (!tokenPolled.isEmpty())
{
throw new IllegalStateException("Replicatoin context is not a roundtrip token as expected");
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -146,7 +146,7 @@
if (storageManager.isReplicated())
{
- storageManager.afterReplicated(new Runnable()
+ storageManager.afterCompletion(new Runnable()
{
public void run()
{
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -1720,7 +1720,7 @@
{
if (storageManager.isReplicated())
{
- storageManager.afterReplicated(new Runnable()
+ storageManager.afterCompletion(new Runnable()
{
public void run()
{
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -244,7 +244,7 @@
{
if (storageManager.isReplicated())
{
- storageManager.afterReplicated(execAfterCommit);
+ storageManager.afterCompletion(execAfterCommit);
}
else
{
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -32,6 +32,7 @@
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.FailoverManager;
import org.hornetq.core.client.impl.FailoverManagerImpl;
+import org.hornetq.core.completion.impl.CompletionContextImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -272,20 +273,8 @@
replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
replicatedJournal.appendRollbackRecord(3, false);
- assertEquals(1, manager.getActiveTokens().size());
-
blockOnReplication(manager);
- for (int i = 0; i < 100; i++)
- {
- // This is asynchronous. Have to wait completion
- if (manager.getActiveTokens().size() == 0)
- {
- break;
- }
- Thread.sleep(1);
- }
-
assertEquals(0, manager.getActiveTokens().size());
ServerMessage msg = new ServerMessageImpl();
@@ -386,7 +375,7 @@
}
final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ CompletionContextImpl.getContext().afterCompletion(new Runnable()
{
public void run()
{
@@ -413,7 +402,7 @@
private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ CompletionContextImpl.getContext().afterCompletion(new Runnable()
{
public void run()
@@ -469,7 +458,7 @@
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ CompletionContextImpl.getContext().afterCompletion(new Runnable()
{
public void run()
@@ -541,7 +530,7 @@
}
- manager.afterReplicated(new Runnable()
+ CompletionContextImpl.getContext().afterCompletion(new Runnable()
{
public void run()
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-17 20:01:28 UTC (rev 8301)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-17 21:40:56 UTC (rev 8302)
@@ -1155,7 +1155,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
*/
- public void afterReplicated(Runnable run)
+ public void afterCompletion(Runnable run)
{
}
15 years, 1 month