JBoss hornetq SVN: r8022 - in trunk: tests/src/org/hornetq/tests/integration/cluster/reattach and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-10-01 05:09:08 -0400 (Thu, 01 Oct 2009)
New Revision: 8022
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
Log:
fixed hanging test
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-10-01 08:27:04 UTC (rev 8021)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-10-01 09:09:08 UTC (rev 8022)
@@ -347,7 +347,7 @@
if (sendSemaphore != null)
{
//Any threads blocking on the send semaphore should be allowed to return
- sendSemaphore.release(Integer.MAX_VALUE);
+ sendSemaphore.release(Integer.MAX_VALUE - sendSemaphore.availablePermits());
}
if (!connection.isDestroyed() && !connection.removeChannel(id))
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-10-01 08:27:04 UTC (rev 8021)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-10-01 09:09:08 UTC (rev 8022)
@@ -416,6 +416,8 @@
t.start();
+ log.info("Failing connection");
+
conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
session.start();
14 years, 7 months
JBoss hornetq SVN: r8021 - in trunk: src/main/org/hornetq/core/config/impl and 2 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-01 04:27:04 -0400 (Thu, 01 Oct 2009)
New Revision: 8021
Modified:
trunk/docs/user-manual/en/transaction-config.xml
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
Log:
HORNETQ-106: reenable transaction timeouts
* in ResourceManagerImpl, reenabled timeouts to rollback *unprepared* transaction branches
- timeout is computed against the branch creation time (as specified in OTS)
* changed default transaction timeout to 5 minutes (300000 ms)
* uncommented XA timeout tests
* updated documentation
Modified: trunk/docs/user-manual/en/transaction-config.xml
===================================================================
--- trunk/docs/user-manual/en/transaction-config.xml 2009-10-01 07:57:16 UTC (rev 8020)
+++ trunk/docs/user-manual/en/transaction-config.xml 2009-10-01 08:27:04 UTC (rev 8021)
@@ -24,9 +24,9 @@
started but the forgotten about. Maybe the client died and never came back. If this happens
then the transaction will just sit there indefinitely.</para>
<para>To cope with this HornetQ can, if configured, scan for old transactions and rollback any
- it finds. The default for this is 60000 milliseconds (1 minute), i.e. any transactions older
- than 60 seconds are removed, however this can be changed by editing the <literal
- >transaction-timeout</literal> property in <literal>hornetq-configuration.xml</literal>.
+ it finds. The default for this is 3000000 milliseconds (5 minutes), i.e. any transactions older
+ than 5 minutes are removed. This timeout can be changed by editing the <literal
+ >transaction-timeout</literal> property in <literal>hornetq-configuration.xml</literal> (value must be in milliseconds).
The property <literal>transaction-timeout-scan-period</literal> configures how often, in
milliseconds, to scan for old transactions.</para>
</chapter>
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-10-01 07:57:16 UTC (rev 8020)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-10-01 08:27:04 UTC (rev 8021)
@@ -116,7 +116,7 @@
public static final int DEFAULT_MESSAGE_COUNTER_MAX_DAY_HISTORY = 10;
- public static final long DEFAULT_TRANSACTION_TIMEOUT = 60000;
+ public static final long DEFAULT_TRANSACTION_TIMEOUT = 300000; // 5 minutes
public static final long DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD = 1000;
Modified: trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java 2009-10-01 07:57:16 UTC (rev 8020)
+++ trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java 2009-10-01 08:27:04 UTC (rev 8021)
@@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.transaction.xa.Xid;
@@ -52,12 +53,12 @@
private boolean started = false;
- //private TxTimeoutHandler task;
+ private TxTimeoutHandler task;
private final long txTimeoutScanPeriod;
- //private final ScheduledExecutorService scheduledThreadPool;
-
+ private final ScheduledExecutorService scheduledThreadPool;
+
public ResourceManagerImpl(final int defaultTimeoutSeconds,
final long txTimeoutScanPeriod,
final ScheduledExecutorService scheduledThreadPool)
@@ -65,7 +66,7 @@
this.defaultTimeoutSeconds = defaultTimeoutSeconds;
this.timeoutSeconds = defaultTimeoutSeconds;
this.txTimeoutScanPeriod = txTimeoutScanPeriod;
- // this.scheduledThreadPool = scheduledThreadPool;
+ this.scheduledThreadPool = scheduledThreadPool;
}
// HornetQComponent implementation
@@ -76,10 +77,9 @@
{
return;
}
- //todo - https://jira.jboss.org/jira/browse/HORNETQ-106
- /*task = new TxTimeoutHandler();
+ task = new TxTimeoutHandler();
Future<?> future = scheduledThreadPool.scheduleAtFixedRate(task, txTimeoutScanPeriod, txTimeoutScanPeriod, TimeUnit.MILLISECONDS);
- task.setFuture(future);*/
+ task.setFuture(future);
started = true;
}
@@ -90,10 +90,10 @@
{
return;
}
- /*if (task != null)
+ if (task != null)
{
task.close();
- }*/
+ }
started = false;
}
@@ -137,7 +137,7 @@
this.timeoutSeconds = timeoutSeconds;
}
- return false;
+ return true;
}
public List<Xid> getPreparedTransactions()
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2009-10-01 07:57:16 UTC (rev 8020)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2009-10-01 08:27:04 UTC (rev 8021)
@@ -65,12 +65,8 @@
private SimpleString atestq = new SimpleString("atestq");
- public void test()
+ protected void setUp() throws Exception
{
-
- }
- /*protected void setUp() throws Exception
- {
super.setUp();
addressSettings.clear();
@@ -559,5 +555,5 @@
{
return Collections.emptySet();
}
- }*/
+ }
}
14 years, 7 months
JBoss hornetq SVN: r8020 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-01 03:57:16 -0400 (Thu, 01 Oct 2009)
New Revision: 8020
Modified:
trunk/docs/user-manual/en/management.xml
Log:
HORNETQ-33: Record list of heuristically committed/rolledback transaction branches
* documented management methods to list heuristically completed transactions
Modified: trunk/docs/user-manual/en/management.xml
===================================================================
--- trunk/docs/user-manual/en/management.xml 2009-10-01 07:47:34 UTC (rev 8019)
+++ trunk/docs/user-manual/en/management.xml 2009-10-01 07:57:16 UTC (rev 8020)
@@ -106,7 +106,9 @@
Strings.) To commit or rollback a given prepared transaction, the <literal
>commitPreparedTransaction</literal>() or <literal
>rollbackPreparedTransaction()</literal> method can be used to resolve
- heuristic transactions.</para>
+ heuristic transactions. Heuristically completed transactions can be listed using
+ the <literal>listHeuristicCommittedTransactions()</literal> and
+ <literal>listHeuristicRolledBackTransactions</literal> methods.</para>
</listitem>
<listitem>
<para>Enabling and resetting Message counters</para>
14 years, 7 months
JBoss hornetq SVN: r8019 - in trunk: src/main/org/hornetq/core/management/impl and 11 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-10-01 03:47:34 -0400 (Thu, 01 Oct 2009)
New Revision: 8019
Modified:
trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/transaction/ResourceManager.java
trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
HORNETQ-33: Record list of heuristically committed/rolledback transaction branches
* store heuristic completion in the journal
* added management methods to list heuristically completed transactions
* fixed XAResource.recover() implementation to return them in addition to prepared branches
* implemented XAResource.forget() to delete the heuristic completions from the journal
Modified: trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -168,6 +168,10 @@
@Operation(desc = "List all the prepared transaction, sorted by date, oldest first")
String[] listPreparedTransactions() throws Exception;
+ String[] listHeuristicCommittedTransactions() throws Exception;
+
+ String[] listHeuristicRolledBackTransactions() throws Exception;
+
@Operation(desc = "Commit a prepared transaction")
boolean commitPreparedTransaction(@Parameter(desc = "the Base64 representation of a transaction", name = "transactionAsBase64") String transactionAsBase64) throws Exception;
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -394,6 +394,30 @@
}
return s;
}
+
+ public String[] listHeuristicCommittedTransactions()
+ {
+ List<Xid> xids = resourceManager.getHeuristicCommittedTransactions();
+ String[] s = new String[xids.size()];
+ int i = 0;
+ for (Xid xid : xids)
+ {
+ s[i++] = XidImpl.toBase64String(xid);
+ }
+ return s;
+ }
+
+ public String[] listHeuristicRolledBackTransactions()
+ {
+ List<Xid> xids = resourceManager.getHeuristicRolledbackTransactions();
+ String[] s = new String[xids.size()];
+ int i = 0;
+ for (Xid xid : xids)
+ {
+ s[i++] = XidImpl.toBase64String(xid);
+ }
+ return s;
+ }
public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception
{
@@ -405,6 +429,8 @@
{
Transaction transaction = resourceManager.removeTransaction(xid);
transaction.commit();
+ long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
+ resourceManager.putHeuristicCompletion(recordID, xid, true);
return true;
}
}
@@ -421,6 +447,8 @@
{
Transaction transaction = resourceManager.removeTransaction(xid);
transaction.rollback();
+ long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
+ resourceManager.putHeuristicCompletion(recordID, xid, false);
return true;
}
}
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -96,6 +96,10 @@
void deletePageTransactional(long txID, long recordID) throws Exception;
+ long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception;
+
+ void deleteHeuristicCompletion(long id) throws Exception;
+
void loadMessageJournal(PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -112,6 +112,8 @@
public static final byte DUPLICATE_ID = 37;
+ public static final byte HEURISTIC_COMPLETION = 38;
+
private UUID persistentID;
private final BatchingIDGenerator idGenerator;
@@ -378,6 +380,18 @@
messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID));
}
+ public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
+ {
+ long id = generateUniqueID();
+ messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true);
+ return id;
+ }
+
+ public void deleteHeuristicCompletion(long id) throws Exception
+ {
+ messageJournal.appendDeleteRecord(id, true);
+ }
+
public void deletePageTransactional(final long txID, final long recordID) throws Exception
{
messageJournal.appendDeleteRecordTransactional(txID, recordID);
@@ -686,6 +700,13 @@
break;
}
+ case HEURISTIC_COMPLETION:
+ {
+ HeuristicCompletionEncoding encoding = new HeuristicCompletionEncoding();
+ encoding.decode(buff);
+ resourceManager.putHeuristicCompletion(record.id, encoding.xid, encoding.isCommit);
+ break;
+ }
default:
{
throw new IllegalStateException("Invalid record type " + recordType);
@@ -1208,7 +1229,40 @@
return XidCodecSupport.getXidEncodeLength(xid);
}
}
+
+ private static class HeuristicCompletionEncoding implements EncodingSupport
+ {
+ Xid xid;
+ boolean isCommit;
+
+ HeuristicCompletionEncoding(final Xid xid, final boolean isCommit)
+ {
+ this.xid = xid;
+ this.isCommit = isCommit;
+ }
+ HeuristicCompletionEncoding()
+ {
+ }
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ xid = XidCodecSupport.decodeXid(buffer);
+ isCommit = buffer.readBoolean();
+ }
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ XidCodecSupport.encodeXid(xid, buffer);
+ buffer.writeBoolean(isCommit);
+ }
+
+ public int getEncodeSize()
+ {
+ return XidCodecSupport.getXidEncodeLength(xid) + DataConstants.SIZE_BOOLEAN;
+ }
+ }
+
private static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
{
long persistenceID;
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -164,6 +164,15 @@
final long recordID) throws Exception
{
}
+
+ public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
+ {
+ return generateUniqueID();
+ }
+
+ public void deleteHeuristicCompletion(final long txID) throws Exception
+ {
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#createLargeMessageStorage(long, int, int)
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -782,9 +782,20 @@
if (theTx == null)
{
- final String msg = "Cannot find xid in resource manager: " + xid;
-
- response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ // checked heuristic committed transactions
+ if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
+ {
+ response = new SessionXAResponseMessage(true, XAException.XA_HEURCOM, "transaction has been heuristically committed: " + xid);
+ }
+ // checked heuristic rolled back transactions
+ else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
+ {
+ response = new SessionXAResponseMessage(true, XAException.XA_HEURRB, "transaction has been heuristically rolled back: " + xid);
+ }
+ else
+ {
+ response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, "Cannot find xid in resource manager: " + xid);
+ }
}
else
{
@@ -901,11 +912,26 @@
public void handleXAForget(final SessionXAForgetMessage packet)
{
- // Do nothing since we don't support heuristic commits / rollback from the
- // resource manager
+ long id = resourceManager.removeHeuristicCompletion(packet.getXid());
+ int code = XAResource.XA_OK;
+ if (id != -1)
+ {
+ try
+ {
+ storageManager.deleteHeuristicCompletion(id);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ code = XAException.XAER_RMERR;
+ }
+ } else
+ {
+ code = XAException.XAER_NOTA;
+ }
+
+ Packet response = new SessionXAResponseMessage((code != XAResource.XA_OK), code, null);
- Packet response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-
channel.confirm(packet);
channel.send(response);
@@ -1044,9 +1070,20 @@
if (theTx == null)
{
- final String msg = "Cannot find xid in resource manager: " + xid;
-
- response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ // checked heuristic committed transactions
+ if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
+ {
+ response = new SessionXAResponseMessage(true, XAException.XA_HEURCOM, "transaction has ben heuristically committed: " + xid);
+ }
+ // checked heuristic rolled back transactions
+ else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
+ {
+ response = new SessionXAResponseMessage(true, XAException.XA_HEURRB, "transaction has ben heuristically rolled back: " + xid);
+ }
+ else
+ {
+ response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, "Cannot find xid in resource manager: " + xid);
+ }
}
else
{
@@ -1249,8 +1286,12 @@
public void handleGetInDoubtXids(final Packet packet)
{
- Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
-
+ List<Xid> indoubtsXids = new ArrayList<Xid>();
+ indoubtsXids.addAll(resourceManager.getPreparedTransactions());
+ indoubtsXids.addAll(resourceManager.getHeuristicCommittedTransactions());
+ indoubtsXids.addAll(resourceManager.getHeuristicRolledbackTransactions());
+ Packet response = new SessionXAGetInDoubtXidsResponseMessage(indoubtsXids);
+
channel.confirm(packet);
channel.send(response);
Modified: trunk/src/main/org/hornetq/core/transaction/ResourceManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/ResourceManager.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/transaction/ResourceManager.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -43,4 +43,13 @@
List<Xid> getPreparedTransactions();
Map<Xid, Long> getPreparedTransactionsWithCreationTime();
+
+ void putHeuristicCompletion(long txid, Xid xid, boolean b);
+
+ long removeHeuristicCompletion(Xid xid);
+
+ List<Xid> getHeuristicCommittedTransactions();
+
+ List<Xid> getHeuristicRolledbackTransactions();
+
}
Modified: trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -23,7 +24,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import javax.transaction.xa.Xid;
@@ -44,6 +44,8 @@
private final ConcurrentMap<Xid, Transaction> transactions = new ConcurrentHashMap<Xid, Transaction>();
+ private List<HeuristicCompletionHolder> heuristicCompletions = new ArrayList<HeuristicCompletionHolder>();
+
private final int defaultTimeoutSeconds;
private volatile int timeoutSeconds;
@@ -163,7 +165,50 @@
}
return xidsWithCreationTime;
}
+
+ public void putHeuristicCompletion(final long recordID, final Xid xid, final boolean isCommit)
+ {
+ heuristicCompletions.add(new HeuristicCompletionHolder(recordID, xid, isCommit));
+ }
+
+ public List<Xid>getHeuristicCommittedTransactions()
+ {
+ return getHeuristicCompletedTransactions(true);
+ }
+
+ public List<Xid>getHeuristicRolledbackTransactions()
+ {
+ return getHeuristicCompletedTransactions(false);
+ }
+ public long removeHeuristicCompletion(Xid xid)
+ {
+ Iterator<HeuristicCompletionHolder> iterator = heuristicCompletions.iterator();
+ while (iterator.hasNext())
+ {
+ ResourceManagerImpl.HeuristicCompletionHolder holder = (ResourceManagerImpl.HeuristicCompletionHolder)iterator.next();
+ if (holder.xid.equals(xid))
+ {
+ iterator.remove();
+ return holder.recordID;
+ }
+ }
+ return -1;
+ }
+
+ private List<Xid>getHeuristicCompletedTransactions(boolean isCommit)
+ {
+ List<Xid> xids = new ArrayList<Xid>();
+ for (HeuristicCompletionHolder holder : heuristicCompletions)
+ {
+ if (holder.isCommit == isCommit)
+ {
+ xids.add(holder.xid);
+ }
+ }
+ return xids;
+ }
+
class TxTimeoutHandler implements Runnable
{
private boolean closed = false;
@@ -220,4 +265,18 @@
}
}
+
+ private class HeuristicCompletionHolder
+ {
+ public final boolean isCommit;
+ public final Xid xid;
+ public final long recordID;
+
+ public HeuristicCompletionHolder(long recordID, Xid xid, boolean isCommit)
+ {
+ this.recordID = recordID;
+ this.xid = xid;
+ this.isCommit = isCommit;
+ }
+ }
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -138,6 +138,9 @@
System.out.println(preparedTransactions[0]);
+ assertEquals(0, jmxServer.listHeuristicCommittedTransactions().length);
+ assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);
+
if (isCommit)
{
jmxServer.commitPreparedTransaction(XidImpl.toBase64String(xid));
@@ -147,6 +150,109 @@
jmxServer.rollbackPreparedTransaction(XidImpl.toBase64String(xid));
}
+ assertEquals(0, jmxServer.listPreparedTransactions().length);
+ if (isCommit)
+ {
+ assertEquals(1, jmxServer.listHeuristicCommittedTransactions().length);
+ assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);
+ }
+ else
+ {
+ assertEquals(0, jmxServer.listHeuristicCommittedTransactions().length);
+ assertEquals(1, jmxServer.listHeuristicRolledBackTransactions().length);
+ }
+
+ if (isCommit)
+ {
+ assertEquals(1, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertEquals(123, msg.getBodySize());
+
+ session.commit();
+ session.close();
+ }
+
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ }
+ finally
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+
+
+
+ }
+
+ public void testHeuristicCommitWithRestart() throws Exception
+ {
+ doHeuristicCompletionWithRestart(true);
+ }
+
+ public void testHeuristicRollbackWithRestart() throws Exception
+ {
+ doHeuristicCompletionWithRestart(false);
+ }
+
+ private void doHeuristicCompletionWithRestart(final boolean isCommit) throws Exception
+ {
+ Configuration configuration = createDefaultConfig();
+ configuration.setJMXManagementEnabled(true);
+
+ HornetQServer server = createServer(true, configuration, mbeanServer, new HashMap<String, AddressSettings>());
+ try
+ {
+ server.start();
+ Xid xid = newXID();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(true, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ msg.getBody().writeBytes(new byte[123]);
+
+ producer.send(msg);
+
+ session.end(xid, XAResource.TMSUCCESS);
+
+ session.prepare(xid);
+
+ session.close();
+
+ HornetQServerControl jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+
+ String preparedTransactions[] = jmxServer.listPreparedTransactions();
+
+ assertEquals(1, preparedTransactions.length);
+ System.out.println(preparedTransactions[0]);
+
+ if (isCommit)
+ {
+ jmxServer.commitPreparedTransaction(XidImpl.toBase64String(xid));
+ }
+ else
+ {
+ jmxServer.rollbackPreparedTransaction(XidImpl.toBase64String(xid));
+ }
+
preparedTransactions = jmxServer.listPreparedTransactions();
assertEquals(0, preparedTransactions.length);
@@ -169,6 +275,22 @@
assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+ server.stop();
+
+ server.start();
+
+ jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+ if (isCommit)
+ {
+ String[] listHeuristicCommittedTransactions = jmxServer.listHeuristicCommittedTransactions();
+ assertEquals(1, listHeuristicCommittedTransactions.length);
+ System.out.println(listHeuristicCommittedTransactions[0]);
+ } else
+ {
+ String[] listHeuristicRolledBackTransactions = jmxServer.listHeuristicRolledBackTransactions();
+ assertEquals(1, listHeuristicRolledBackTransactions.length);
+ System.out.println(listHeuristicRolledBackTransactions[0]);
+ }
}
finally
{
@@ -177,9 +299,222 @@
server.stop();
}
}
+ }
+
+ public void testRecoverHeuristicCommitWithRestart() throws Exception
+ {
+ doRecoverHeuristicCompletedTxWithRestart(true);
+ }
+
+ public void testRecoverHeuristicRollbackWithRestart() throws Exception
+ {
+ doRecoverHeuristicCompletedTxWithRestart(false);
+ }
+ private void doRecoverHeuristicCompletedTxWithRestart(boolean heuristicCommit) throws Exception
+ {
+ Configuration configuration = createDefaultConfig();
+ configuration.setJMXManagementEnabled(true);
+
+ HornetQServer server = createServer(true, configuration, mbeanServer, new HashMap<String, AddressSettings>());
+ try
+ {
+ server.start();
+ Xid xid = newXID();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(true, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ msg.getBody().writeBytes(new byte[123]);
+
+ producer.send(msg);
+
+ session.end(xid, XAResource.TMSUCCESS);
+
+ session.prepare(xid);
+
+ session.close();
+
+ HornetQServerControl jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+
+ String preparedTransactions[] = jmxServer.listPreparedTransactions();
+
+ assertEquals(1, preparedTransactions.length);
+ System.out.println(preparedTransactions[0]);
+
+ if (heuristicCommit)
+ {
+ jmxServer.commitPreparedTransaction(XidImpl.toBase64String(xid));
+ }
+ else
+ {
+ jmxServer.rollbackPreparedTransaction(XidImpl.toBase64String(xid));
+ }
+
+ preparedTransactions = jmxServer.listPreparedTransactions();
+ assertEquals(0, preparedTransactions.length);
+
+ if (heuristicCommit)
+ {
+ assertEquals(1, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertEquals(123, msg.getBodySize());
+
+ session.commit();
+ session.close();
+ }
+
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ server.stop();
+
+ server.start();
+
+ jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+ if (heuristicCommit)
+ {
+ String[] listHeuristicCommittedTransactions = jmxServer.listHeuristicCommittedTransactions();
+ assertEquals(1, listHeuristicCommittedTransactions.length);
+ System.out.println(listHeuristicCommittedTransactions[0]);
+ } else
+ {
+ String[] listHeuristicRolledBackTransactions = jmxServer.listHeuristicRolledBackTransactions();
+ assertEquals(1, listHeuristicRolledBackTransactions.length);
+ System.out.println(listHeuristicRolledBackTransactions[0]);
+ }
+
+ session = sf.createSession(true, false, false);
+ Xid[] recoveredXids = session.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(1, recoveredXids.length);
+ assertEquals(xid, recoveredXids[0]);
+ assertEquals(0, session.recover(XAResource.TMENDRSCAN).length);
+ }
+ finally
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
}
+
+ public void testForgetHeuristicCommitAndRestart() throws Exception
+ {
+ doForgetHeuristicCompletedTxAndRestart(true);
+ }
+ public void testForgetHeuristicRollbackAndRestart() throws Exception
+ {
+ doForgetHeuristicCompletedTxAndRestart(false);
+ }
+
+ private void doForgetHeuristicCompletedTxAndRestart(boolean heuristicCommit) throws Exception
+ {
+ Configuration configuration = createDefaultConfig();
+ configuration.setJMXManagementEnabled(true);
+
+ HornetQServer server = createServer(true, configuration, mbeanServer, new HashMap<String, AddressSettings>());
+ try
+ {
+ server.start();
+ Xid xid = newXID();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(true, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ msg.getBody().writeBytes(new byte[123]);
+
+ producer.send(msg);
+
+ session.end(xid, XAResource.TMSUCCESS);
+
+ session.prepare(xid);
+
+ HornetQServerControl jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+
+ String preparedTransactions[] = jmxServer.listPreparedTransactions();
+
+ assertEquals(1, preparedTransactions.length);
+ System.out.println(preparedTransactions[0]);
+
+ if (heuristicCommit)
+ {
+ jmxServer.commitPreparedTransaction(XidImpl.toBase64String(xid));
+ }
+ else
+ {
+ jmxServer.rollbackPreparedTransaction(XidImpl.toBase64String(xid));
+ }
+
+ preparedTransactions = jmxServer.listPreparedTransactions();
+ assertEquals(0, preparedTransactions.length);
+
+ session.forget(xid);
+
+ session.close();
+
+ if (heuristicCommit)
+ {
+ assertEquals(0, jmxServer.listHeuristicCommittedTransactions().length);
+ }
+ else
+ {
+ assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);
+ }
+
+
+ server.stop();
+
+ server.start();
+
+ session = sf.createSession(true, false, false);
+ Xid[] recoveredXids = session.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(0, recoveredXids.length);
+ jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+ if (heuristicCommit)
+ {
+ assertEquals(0, jmxServer.listHeuristicCommittedTransactions().length);
+ }
+ else
+ {
+ assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);
+ }
+
+ }
+ finally
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -362,6 +362,16 @@
{
return (String[])proxy.invokeOperation("listPreparedTransactions");
}
+
+ public String[] listHeuristicCommittedTransactions() throws Exception
+ {
+ return (String[])proxy.invokeOperation("listHeuristicCommittedTransactions");
+ }
+
+ public String[] listHeuristicRolledBackTransactions() throws Exception
+ {
+ return (String[])proxy.invokeOperation("listHeuristicRolledBackTransactions");
+ }
public String[] listRemoteAddresses() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -397,11 +397,76 @@
}
- public void testForget() throws Exception
+ public void testForgetUnknownXID() throws Exception
{
- clientSession.forget(newXID());
+ try
+ {
+ clientSession.forget(newXID());
+ fail("should throw a XAERR_NOTA XAException");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_NOTA, e.errorCode);
+ }
}
+ public void testForgetHeuristicallyCommittedXID() throws Exception
+ {
+ Xid xid = newXID();
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ String[] preparedTransactions = messagingService.getHornetQServerControl().listPreparedTransactions();
+ assertEquals(1, preparedTransactions.length);
+ System.out.println(preparedTransactions[0]);
+ assertTrue(messagingService.getHornetQServerControl().commitPreparedTransaction(XidImpl.toBase64String(xid)));
+ assertEquals(1, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
+
+ clientSession.forget(xid);
+
+ assertEquals(0, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
+ }
+
+ public void testForgetHeuristicallyRolledBackXID() throws Exception
+ {
+ Xid xid = newXID();
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ String[] preparedTransactions = messagingService.getHornetQServerControl().listPreparedTransactions();
+ assertEquals(1, preparedTransactions.length);
+ System.out.println(preparedTransactions[0]);
+
+ assertTrue(messagingService.getHornetQServerControl().rollbackPreparedTransaction(XidImpl.toBase64String(xid)));
+ assertEquals(1, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
+
+ clientSession.forget(xid);
+
+ assertEquals(0, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
+ }
+
+ public void testCommitHeuristicallyCommittedXID() throws Exception
+ {
+ doCompleteHeuristicallyCompletedXID(true, true);
+ }
+
+ public void testCommitHeuristicallyRolledBackXID() throws Exception
+ {
+ doCompleteHeuristicallyCompletedXID(true, false);
+ }
+
+ public void testRollbacktHeuristicallyCommittedXID() throws Exception
+ {
+ doCompleteHeuristicallyCompletedXID(false, true);
+ }
+
+ public void testRollbackHeuristicallyRolledBackXID() throws Exception
+ {
+ doCompleteHeuristicallyCompletedXID(false, false);
+ }
+
public void testSimpleJoin() throws Exception
{
SimpleString ADDRESS1 = new SimpleString("Address-1");
@@ -666,6 +731,62 @@
}
}
+ private void doCompleteHeuristicallyCompletedXID(boolean isCommit, boolean heuristicCommit) throws Exception
+ {
+ Xid xid = newXID();
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ String[] preparedTransactions = messagingService.getHornetQServerControl().listPreparedTransactions();
+ assertEquals(1, preparedTransactions.length);
+
+ if (heuristicCommit)
+ {
+ assertTrue(messagingService.getHornetQServerControl().commitPreparedTransaction(XidImpl.toBase64String(xid)));
+ assertEquals(1, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
+ }
+ else
+ {
+ assertTrue(messagingService.getHornetQServerControl().rollbackPreparedTransaction(XidImpl.toBase64String(xid)));
+ assertEquals(1, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
+ }
+ assertEquals(0, messagingService.getHornetQServerControl().listPreparedTransactions().length);
+
+ try
+ {
+ if (isCommit)
+ {
+ clientSession.commit(xid, false);
+ } else
+ {
+ clientSession.rollback(xid);
+ }
+ fail("neither commit not rollback must succeed on a heuristically completed tx");
+ }
+
+ catch (XAException e)
+ {
+ if (heuristicCommit)
+ {
+ assertEquals(XAException.XA_HEURCOM, e.errorCode);
+ }
+ else
+ {
+ assertEquals(XAException.XA_HEURRB, e.errorCode);
+ }
+ }
+
+ if (heuristicCommit)
+ {
+ assertEquals(1, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
+ }
+ else
+ {
+ assertEquals(1, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
+ }
+ }
+
class TxMessageHandler implements MessageHandler
{
boolean failedToAck = false;
Modified: trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -461,7 +461,17 @@
return null;
}
-
+
+ public String[] listHeuristicCommittedTransactions() throws Exception
+ {
+ return null;
+ }
+
+ public String[] listHeuristicRolledBackTransactions() throws Exception
+ {
+ return null;
+ }
+
public String[] listRemoteAddresses() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -1034,6 +1034,15 @@
{
}
+ public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
+ {
+ return -1;
+ }
+
+ public void deleteHeuristicCompletion(long txID) throws Exception
+ {
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#updateDeliveryCount(org.hornetq.core.server.MessageReference)
*/
14 years, 7 months
JBoss hornetq SVN: r8018 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence/impl/journal and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-30 22:50:16 -0400 (Wed, 30 Sep 2009)
New Revision: 8018
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
branches/Replication_Clebert/src/main/org/hornetq/core/journal/TransactionFailureCallback.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
changes...
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -75,9 +75,6 @@
// Load
- /** This method could be promoted to {@link Journal} interface when we decide to use the loadManager
- * instead of load(List,List)
- */
long load(LoaderCallback reloadManager) throws Exception;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/TransactionFailureCallback.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/TransactionFailureCallback.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/TransactionFailureCallback.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -16,7 +16,7 @@
import java.util.List;
/**
- * A TransactionFailureCallback
+ * A Callback to receive information about bad transactions for extra cleanup required for broken transactions such as large messages.
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -163,13 +163,7 @@
journalDir = config.getJournalDirectory();
- if (journalDir == null)
- {
- throw new NullPointerException("journal-dir is null");
- }
- createJournalDir = config.isCreateJournalDir();
-
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
bindingsJournal = new JournalImpl(1024 * 1024,
@@ -181,6 +175,13 @@
"bindings",
1);
+ if (journalDir == null)
+ {
+ throw new NullPointerException("journal-dir is null");
+ }
+
+ createJournalDir = config.isCreateJournalDir();
+
syncNonTransactional = config.isJournalSyncNonTransactional();
syncTransactional = config.isJournalSyncTransactional();
@@ -738,7 +739,7 @@
messageJournal.perfBlast(perfBlastPages);
}
}
-
+
/**
* @param messages
* @param buff
@@ -1141,7 +1142,7 @@
// This should be accessed from this package only
void deleteFile(final SequentialFile file)
{
- executor.execute(new Runnable()
+ Runnable deleteAction = new Runnable()
{
public void run()
{
@@ -1155,7 +1156,16 @@
}
}
- });
+ };
+
+ if (executor == null)
+ {
+ deleteAction.run();
+ }
+ else
+ {
+ executor.execute(deleteAction);
+ }
}
/**
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -141,7 +141,7 @@
// Replication
- public static final byte REPLICATION_APPEND = 77;
+ public static final byte REPLICATION_APPEND = 80;
// Static --------------------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationAddMessage.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -33,6 +33,9 @@
private long id;
+ /** 0 - Bindings, 1 - MessagesJournal */
+ private byte journalID;
+
private byte recordType;
private EncodingSupport encodingData;
@@ -42,27 +45,26 @@
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
-
+
public ReplicationAddMessage()
{
super(REPLICATION_APPEND);
}
- public ReplicationAddMessage(long id, byte recordType, EncodingSupport encodingData)
+ public ReplicationAddMessage(byte journalID, long id, byte recordType, EncodingSupport encodingData)
{
this();
+ this.journalID = journalID;
this.id = id;
this.recordType = recordType;
this.encodingData = encodingData;
}
// Public --------------------------------------------------------
-
-
-
+
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE +
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_BYTE +
DataConstants.SIZE_LONG +
DataConstants.SIZE_BYTE +
DataConstants.SIZE_INT +
@@ -73,6 +75,7 @@
@Override
public void encodeBody(final HornetQBuffer buffer)
{
+ buffer.writeByte(journalID);
buffer.writeLong(id);
buffer.writeByte(recordType);
buffer.writeInt(encodingData.getEncodeSize());
@@ -82,6 +85,7 @@
@Override
public void decodeBody(final HornetQBuffer buffer)
{
+ journalID = buffer.readByte();
id = buffer.readLong();
recordType = buffer.readByte();
int size = buffer.readInt();
@@ -89,7 +93,38 @@
buffer.readBytes(recordData);
}
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+ /**
+ * @return the journalID
+ */
+ public byte getJournalID()
+ {
+ return journalID;
+ }
+
+ /**
+ * @return the recordType
+ */
+ public byte getRecordType()
+ {
+ return recordType;
+ }
+
+ /**
+ * @return the recordData
+ */
+ public byte[] getRecordData()
+ {
+ return recordData;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationEndpoint.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -13,6 +13,7 @@
package org.hornetq.core.replication;
+import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.server.HornetQComponent;
@@ -26,4 +27,8 @@
public interface ReplicationEndpoint extends ChannelHandler, HornetQComponent
{
+ void setChannel(Channel channel);
+
+ Channel getChannel();
+
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -24,5 +24,5 @@
*/
public interface ReplicationManager extends HornetQComponent
{
- void appendAddRecord(long id, byte recordType, EncodingSupport record);
+ void appendAddRecord(byte journalID, long id, byte recordType, EncodingSupport record);
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -13,11 +13,16 @@
package org.hornetq.core.replication.impl;
-import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPacket;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.HornetQServer;
/**
@@ -32,10 +37,20 @@
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ReplicationEndpointImpl.class);
+
// Attributes ----------------------------------------------------
private final HornetQServer server;
+ private Channel channel;
+
+ private Journal bindingsJournal;
+
+ private Journal messagingJournal;
+
+ private JournalStorageManager storage;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -45,12 +60,26 @@
}
// Public --------------------------------------------------------
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
* @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
*/
public void handlePacket(Packet packet)
{
- System.out.println("packet = " + packet);
+ try
+ {
+ if (packet.getType() == PacketImpl.REPLICATION_APPEND)
+ {
+ System.out.println("Replicated");
+ handleAppendAddRecord(packet);
+ }
+ }
+ catch (Exception e)
+ {
+ // TODO: what to do when the IO fails on the backup side? should we shutdown the backup?
+ log.warn(e.getMessage(), e);
+ }
+ channel.send(new NullResponseMessage());
}
/* (non-Javadoc)
@@ -66,6 +95,17 @@
*/
public void start() throws Exception
{
+ Configuration config = server.getConfiguration();
+
+ // TODO: this needs an executor
+ JournalStorageManager storage = new JournalStorageManager(config, null);
+ storage.start();
+
+ this.bindingsJournal = storage.getBindingsJournal();
+ this.messagingJournal = storage.getBindingsJournal();
+
+ // We only need to load internal structures on the backup...
+ storage.loadInternalOnly();
}
/* (non-Javadoc)
@@ -73,14 +113,52 @@
*/
public void stop() throws Exception
{
+ storage.stop();
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationEndpoint#getChannel()
+ */
+ public Channel getChannel()
+ {
+ return channel;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationEndpoint#setChannel(org.hornetq.core.remoting.Channel)
+ */
+ public void setChannel(Channel channel)
+ {
+ this.channel = channel;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+ /**
+ * @param packet
+ * @throws Exception
+ */
+ private void handleAppendAddRecord(Packet packet) throws Exception
+ {
+ ReplicationAddMessage addMessage = (ReplicationAddMessage)packet;
+ Journal journalToUse;
+
+ if (addMessage.getJournalID() == (byte)0)
+ {
+ journalToUse = bindingsJournal;
+ }
+ else
+ {
+ journalToUse = messagingJournal;
+ }
+
+ journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -13,11 +13,19 @@
package org.hornetq.core.replication.impl;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+
import org.hornetq.core.client.impl.ConnectionManager;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.ReplicationToken;
@@ -33,12 +41,15 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ReplicationManagerImpl.class);
// Attributes ----------------------------------------------------
// TODO: where should this be configured?
private static final int WINDOW_SIZE = 100 * 1024;
+ private final ResponseHandler responseHandler = new ResponseHandler();
+
private final ConnectionManager connectionManager;
private RemotingConnection connection;
@@ -47,6 +58,16 @@
private boolean started;
+ private boolean playedResponsesOnFailure;
+
+ private final Object replicationLock = new Object();
+
+ private final Executor executor;
+
+ private final ThreadLocal<ReplicationToken> repliToken = new ThreadLocal<ReplicationToken>();
+
+ private final Queue<ReplicationToken> pendingTokens = new ConcurrentLinkedQueue<ReplicationToken>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -54,10 +75,11 @@
/**
* @param replicationConnectionManager
*/
- public ReplicationManagerImpl(ConnectionManager connectionManager)
+ public ReplicationManagerImpl(final ConnectionManager connectionManager, final Executor executor)
{
super();
this.connectionManager = connectionManager;
+ this.executor = executor;
}
// Public --------------------------------------------------------
@@ -65,11 +87,10 @@
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#replicate(byte[], org.hornetq.core.replication.ReplicationToken)
*/
-
-
- public void appendAddRecord(long id, byte recordType, EncodingSupport encodingData)
+
+ public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport encodingData)
{
- replicatingChannel.send(new ReplicationAddMessage(id, recordType, encodingData));
+ sendReplicatePacket(new ReplicationAddMessage(journalID, id, recordType, encodingData));
}
/* (non-Javadoc)
@@ -85,22 +106,22 @@
*/
public synchronized void start() throws Exception
{
- this.started = true;
-
connection = connectionManager.getConnection(1);
long channelID = connection.generateChannelID();
Channel mainChannel = connection.getChannel(1, -1, false);
- Channel tempChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
+ this.replicatingChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
+ this.replicatingChannel.setHandler(this.responseHandler);
+
CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage(channelID,
WINDOW_SIZE);
mainChannel.sendBlocking(replicationStartPackage);
- this.replicatingChannel = tempChannel;
+ this.started = true;
}
/* (non-Javadoc)
@@ -114,7 +135,7 @@
}
this.started = false;
-
+
if (connection != null)
{
connection.destroy();
@@ -123,6 +144,63 @@
connection = null;
}
+ public ReplicationToken getReplicationToken()
+ {
+ ReplicationToken token = repliToken.get();
+ if (token == null)
+ {
+ token = new ReplicationTokenImpl(executor);
+ repliToken.set(token);
+ }
+ return token;
+ }
+
+ private void sendReplicatePacket(final Packet packet)
+ {
+ boolean runItNow = false;
+
+ ReplicationToken repliToken = getReplicationToken();
+ repliToken.linedUp();
+
+ synchronized (replicationLock)
+ {
+ if (playedResponsesOnFailure)
+ {
+ // Already replicating channel failed, so just play the action now
+
+ runItNow = true;
+ }
+ else
+ {
+ pendingTokens.add(repliToken);
+
+ // TODO: Should I use connect.write directly here?
+ replicatingChannel.send(packet);
+ }
+ }
+
+ // Execute outside lock
+
+ if (runItNow)
+ {
+ repliToken.replicated();
+ }
+ }
+
+ private void replicated()
+ {
+ ReplicationToken tokenPolled = pendingTokens.poll();
+ if (tokenPolled == null)
+ {
+ // We should debug the logs if this happens
+ log.warn("Missing replication token on the stack. There is a bug on the ReplicatoinManager since this was not supposed to happen");
+ }
+ else
+ {
+ tokenPolled.replicated();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -131,4 +209,20 @@
// Inner classes -------------------------------------------------
+ protected class ResponseHandler implements ChannelHandler
+ {
+ /* (non-Javadoc)
+ * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+ */
+ public void handlePacket(Packet packet)
+ {
+ System.out.println("HandlePacket on client");
+ if (packet.getType() == PacketImpl.NULL_RESPONSE)
+ {
+ replicated();
+ }
+ }
+
+ }
+
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationTokenImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -46,6 +46,7 @@
public synchronized void linedUp()
{
pendings++;
+ System.out.println("pendings (lined up) = " + pendings);
}
/** To be called by the replication manager, when data is confirmed on the channel */
@@ -62,11 +63,13 @@
tasks.clear();
}
}
+ System.out.println("pendings (replicated) = " + pendings);
}
/** You may have several actions to be done after a replication operation is completed. */
public synchronized void addFutureCompletion(Runnable runnable)
{
+ System.out.println("pendings = " + pendings);
if (pendings == 0)
{
executor.execute(runnable);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -24,6 +24,7 @@
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
@@ -71,7 +72,7 @@
ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
- ReplicationEndpoint createReplicationEndpoint() throws HornetQException;
+ ReplicationEndpoint createReplicationEndpoint(Channel channel) throws Exception;
CreateSessionResponseMessage createSession(String name,
long channelID,
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -196,7 +196,7 @@
try
{
Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize(), false);
- ReplicationEndpoint endpoint = server.createReplicationEndpoint();
+ ReplicationEndpoint endpoint = server.createReplicationEndpoint(channel);
channel.setHandler(endpoint);
response = new NullResponseMessage();
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -200,7 +200,7 @@
private ReplicationManager replicationManager;
- private ReplicationEndpoint replicationEndpoint = new ReplicationEndpointImpl(this);
+ private ReplicationEndpoint replicationEndpoint;
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
@@ -593,7 +593,7 @@
return new CreateSessionResponseMessage(true, version.getIncrementingVersion());
}
- public synchronized ReplicationEndpoint createReplicationEndpoint() throws HornetQException
+ public synchronized ReplicationEndpoint createReplicationEndpoint(final Channel channel) throws Exception
{
if (!configuration.isBackup())
{
@@ -603,7 +603,11 @@
if (replicationEndpoint == null)
{
replicationEndpoint = new ReplicationEndpointImpl(this);
+ replicationEndpoint.start();
}
+
+ replicationEndpoint.setChannel(channel);
+
return replicationEndpoint;
}
@@ -712,7 +716,7 @@
scheduledPool,
null);
- this.replicationManager = new ReplicationManagerImpl(replicatingConnectionManager);
+ this.replicationManager = new ReplicationManagerImpl(replicatingConnectionManager, this.executorFactory.getExecutor());
replicationManager.start();
}
}
@@ -1113,6 +1117,8 @@
}
}, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
}
+
+ startReplication();
initialised = true;
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-30 22:49:33 UTC (rev 8017)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-01 02:50:16 UTC (rev 8018)
@@ -18,16 +18,15 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ConnectionManager;
import org.hornetq.core.client.impl.ConnectionManagerImpl;
@@ -36,39 +35,19 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.impl.HornetQServerControlImpl;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Interceptor;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
-import org.hornetq.core.security.HornetQSecurityManager;
-import org.hornetq.core.security.Role;
-import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.QueueFactory;
-import org.hornetq.core.server.ServerSession;
-import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.core.settings.HierarchicalRepository;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.transaction.ResourceManager;
-import org.hornetq.core.version.Version;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.SimpleString;
/**
* A ReplicationTest
@@ -111,7 +90,7 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
manager.start();
manager.stop();
}
@@ -134,7 +113,7 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
try
{
manager.start();
@@ -165,10 +144,20 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
manager.start();
- manager.appendAddRecord(1, (byte)1, new DataImplement());
- Thread.sleep(1000);
+ manager.appendAddRecord((byte)0, 1, (byte)1, new DataImplement());
+ final CountDownLatch latch = new CountDownLatch(1);
+ manager.getReplicationToken().addFutureCompletion(new Runnable()
+ {
+
+ public void run()
+ {
+ latch.countDown();
+ }
+
+ });
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
manager.stop();
}
finally
@@ -176,7 +165,7 @@
server.stop();
}
}
-
+
class DataImplement implements EncodingSupport
{
@@ -196,7 +185,7 @@
{
return 5;
}
-
+
}
// Package protected ---------------------------------------------
@@ -279,313 +268,4 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
-
- static class FakeServer implements HornetQServer
- {
-
- public Queue createQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filter,
- boolean durable,
- boolean temporary) throws Exception
- {
- return null;
- }
-
- public CreateSessionResponseMessage createSession(String name,
- long channelID,
- String username,
- String password,
- int minLargeMessageSize,
- int incrementingVersion,
- RemotingConnection remotingConnection,
- boolean autoCommitSends,
- boolean autoCommitAcks,
- boolean preAcknowledge,
- boolean xa,
- int producerWindowSize) throws Exception
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#deployQueue(org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString, boolean, boolean)
- */
- public Queue deployQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filterString,
- boolean durable,
- boolean temporary) throws Exception
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#destroyQueue(org.hornetq.utils.SimpleString, org.hornetq.core.server.ServerSession)
- */
- public void destroyQueue(SimpleString queueName, ServerSession session) throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getAddressSettingsRepository()
- */
- public HierarchicalRepository<AddressSettings> getAddressSettingsRepository()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getClusterManager()
- */
- public ClusterManager getClusterManager()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getConfiguration()
- */
- public Configuration getConfiguration()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getConnectionCount()
- */
- public int getConnectionCount()
- {
-
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getExecutorFactory()
- */
- public ExecutorFactory getExecutorFactory()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getHornetQServerControl()
- */
- public HornetQServerControlImpl getHornetQServerControl()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getMBeanServer()
- */
- public MBeanServer getMBeanServer()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getManagementService()
- */
- public ManagementService getManagementService()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getNodeID()
- */
- public SimpleString getNodeID()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getPostOffice()
- */
- public PostOffice getPostOffice()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getQueueFactory()
- */
- public QueueFactory getQueueFactory()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getRemotingService()
- */
- public RemotingService getRemotingService()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getResourceManager()
- */
- public ResourceManager getResourceManager()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getSecurityManager()
- */
- public HornetQSecurityManager getSecurityManager()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getSecurityRepository()
- */
- public HierarchicalRepository<Set<Role>> getSecurityRepository()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getSession(java.lang.String)
- */
- public ServerSession getSession(String name)
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getSessions()
- */
- public Set<ServerSession> getSessions()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getSessions(java.lang.String)
- */
- public List<ServerSession> getSessions(String connectionID)
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getStorageManager()
- */
- public StorageManager getStorageManager()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#getVersion()
- */
- public Version getVersion()
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#isInitialised()
- */
- public boolean isInitialised()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#isStarted()
- */
- public boolean isStarted()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#reattachSession(org.hornetq.core.remoting.RemotingConnection, java.lang.String, int)
- */
- public ReattachSessionResponseMessage reattachSession(RemotingConnection connection,
- String name,
- int lastReceivedCommandID) throws Exception
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#registerActivateCallback(org.hornetq.core.server.ActivateCallback)
- */
- public void registerActivateCallback(ActivateCallback callback)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#removeSession(java.lang.String)
- */
- public void removeSession(String name) throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#unregisterActivateCallback(org.hornetq.core.server.ActivateCallback)
- */
- public void unregisterActivateCallback(ActivateCallback callback)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#start()
- */
- public void start() throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#stop()
- */
- public void stop() throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQServer#createReplicationEndpoint()
- */
- public ReplicationEndpoint createReplicationEndpoint()
- {
- return new ReplicationEndpointImpl(this);
- }
-
- }
}
14 years, 7 months