JBoss hornetq SVN: r8301 - trunk/tests/src/org/hornetq/tests/integration/cluster/bridge.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-17 15:01:28 -0500 (Tue, 17 Nov 2009)
New Revision: 8301
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
Log:
Just a tiny tweak to speed up the Bridge test... (since I had this pending here on my workspace)
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-11-17 19:40:59 UTC (rev 8300)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-11-17 20:01:28 UTC (rev 8301)
@@ -18,11 +18,9 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
-import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
/**
@@ -56,6 +54,8 @@
serviceConf.setJournalDirectory(getJournalDir(id, false));
serviceConf.setPagingDirectory(getPageDir(id, false));
serviceConf.setLargeMessagesDirectory(getLargeMessagesDir(id, false));
+ // these tests don't need any big storage so limiting the size of the journal files to speed up the test
+ serviceConf.setJournalFileSize(100 * 1024);
if (netty)
{
15 years, 1 month
JBoss hornetq SVN: r8300 - in branches/ClebertTemporary: src/main/org/hornetq/core/persistence/impl/journal and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-17 14:40:59 -0500 (Tue, 17 Nov 2009)
New Revision: 8300
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
My refactoring starting point
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-17 19:40:59 UTC (rev 8300)
@@ -147,6 +147,4 @@
void deleteGrouping(GroupBinding groupBinding) throws Exception;
-
- void sync();
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-17 19:40:59 UTC (rev 8300)
@@ -498,14 +498,6 @@
messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
}
- public void sync()
- {
- if (replicator != null)
- {
- replicator.sync();
- }
- }
-
// Transactional operations
public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-17 19:40:59 UTC (rev 8300)
@@ -914,13 +914,6 @@
}
}
}
- else
- {
- if (storageManager.isReplicated())
- {
- storageManager.sync();
- }
- }
message.incrementRefCount(reference);
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-17 19:40:59 UTC (rev 8300)
@@ -243,6 +243,8 @@
log.warn("Connection failure has been detected: " + me.getMessage() + " [code=" + me.getCode() + "]");
+ System.out.println("Fail on RemotingConnectio");
+
// Then call the listeners
callFailureListeners(me);
@@ -399,6 +401,7 @@
for (final FailureListener listener : listenersClone)
{
+ System.out.println("Calling failure listener: " + listener.getClass().getName());
try
{
listener.connectionFailed(me);
@@ -419,6 +422,8 @@
for (final CloseListener listener : listenersClone)
{
+ System.out.println("Calling listener -> " + listener);
+ System.out.println("Calling listener " + listener.getClass().getName());
try
{
listener.connectionClosed();
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-11-17 19:40:59 UTC (rev 8300)
@@ -26,6 +26,8 @@
{
/** To be called by the replication manager, when new replication is added to the queue */
void linedUp();
+
+ boolean hasReplication();
/** To be called by the replication manager, when data is confirmed on the channel */
void replicated();
@@ -37,5 +39,9 @@
/** Flush all pending callbacks on the Context */
void flush();
+
+ boolean isSync();
+
+ void setSync(boolean sync);
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-17 19:40:59 UTC (rev 8300)
@@ -88,6 +88,4 @@
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
- void sync();
-
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-17 19:40:59 UTC (rev 8300)
@@ -29,11 +29,15 @@
public class ReplicationContextImpl implements ReplicationContext
{
private List<Runnable> tasks;
-
- private AtomicInteger pendings = new AtomicInteger(0);
-
+
+ private int linedup = 0;
+
+ private int replicated = 0;
+
+ private boolean sync = false;
+
private volatile boolean complete = false;
-
+
/**
* @param executor
*/
@@ -45,9 +49,14 @@
/** To be called by the replication manager, when new replication is added to the queue */
public void linedUp()
{
- pendings.incrementAndGet();
+ linedup++;
}
+ public boolean hasReplication()
+ {
+ return linedup > 0;
+ }
+
/** You may have several actions to be done after a replication operation is completed. */
public void addReplicationAction(Runnable runnable)
{
@@ -63,32 +72,32 @@
// We don't add any more Runnables after it is complete
tasks = new ArrayList<Runnable>();
}
-
+
tasks.add(runnable);
}
/** To be called by the replication manager, when data is confirmed on the channel */
public synchronized void replicated()
{
- if (pendings.decrementAndGet() == 0 && complete)
+ // roundtrip packets won't have lined up packets
+ if (++replicated == linedup && complete)
{
flush();
}
}
-
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationToken#complete()
*/
public synchronized void complete()
{
complete = true;
- if (pendings.get() == 0 && complete)
+ if (replicated == linedup && complete)
{
flush();
}
- }
-
+ }
+
public synchronized void flush()
{
if (tasks != null)
@@ -100,6 +109,18 @@
tasks.clear();
}
}
-
-
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
+ */
+ public boolean isSync()
+ {
+ return sync;
+ }
+
+ public void setSync(final boolean sync)
+ {
+ this.sync = sync;
+ }
+
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-17 19:40:59 UTC (rev 8300)
@@ -13,6 +13,8 @@
package org.hornetq.core.replication.impl;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -34,7 +36,6 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -84,8 +85,6 @@
private final Queue<ReplicationContext> pendingTokens = new ConcurrentLinkedQueue<ReplicationContext>();
- private final ConcurrentHashSet<ReplicationContext> activeContexts = new ConcurrentHashSet<ReplicationContext>();
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -278,14 +277,6 @@
sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId));
}
}
-
- public void sync()
- {
- if (enabled)
- {
- sendReplicatePacket(new ReplicationSyncContextMessage());
- }
- }
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long, byte[])
@@ -351,9 +342,9 @@
log.warn(e.getMessage(), e);
}
}
-
+
public void beforeReconnect(HornetQException me)
- {
+ {
}
});
@@ -369,14 +360,22 @@
{
enabled = false;
+ LinkedHashSet<ReplicationContext> activeContexts = new LinkedHashSet<ReplicationContext>();
+
+ // The same context will be replicated on the pending tokens...
+ // as the multiple operations will be replicated on the same context
+ while (!pendingTokens.isEmpty())
+ {
+ ReplicationContext ctx = pendingTokens.poll();
+ activeContexts.add(ctx);
+ }
+
for (ReplicationContext ctx : activeContexts)
{
ctx.complete();
ctx.flush();
}
-
- activeContexts.clear();
-
+
if (replicatingChannel != null)
{
replicatingChannel.close();
@@ -400,7 +399,6 @@
if (token == null)
{
token = new ReplicationContextImpl();
- activeContexts.add(token);
tlReplicationContext.set(token);
}
return token;
@@ -420,29 +418,38 @@
public void closeContext()
{
final ReplicationContext token = tlReplicationContext.get();
-
+
if (token != null)
{
// Disassociate thread local
tlReplicationContext.set(null);
// Remove from pending tokens as soon as this is complete
- token.addReplicationAction(new Runnable()
+ if (!token.hasReplication())
{
- public void run()
- {
- activeContexts.remove(token);
- }
- });
+ sync(token);
+ }
token.complete();
}
}
- /* (non-Javadoc)
+ /* method for testcases only
* @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
*/
public Set<ReplicationContext> getActiveTokens()
{
+
+ LinkedHashSet<ReplicationContext> activeContexts = new LinkedHashSet<ReplicationContext>();
+
+ // The same context will be replicated on the pending tokens...
+ // as the multiple operations will be replicated on the same context
+
+ for (ReplicationContext ctx : pendingTokens)
+ {
+ activeContexts.add(ctx);
+ }
+
return activeContexts;
+
}
private void sendReplicatePacket(final Packet packet)
@@ -475,7 +482,7 @@
repliToken.replicated();
}
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
*/
@@ -484,18 +491,14 @@
replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
}
-
private void replicated()
{
- ReplicationContext tokenPolled = pendingTokens.poll();
- if (tokenPolled == null)
+ ArrayList<ReplicationContext> tokensToExecute = getTokens();
+
+ for (ReplicationContext ctx : tokensToExecute)
{
- throw new IllegalStateException("Missing replication token on the queue.");
+ ctx.replicated();
}
- else
- {
- tokenPolled.replicated();
- }
}
// Package protected ---------------------------------------------
@@ -504,6 +507,80 @@
// Private -------------------------------------------------------
+ private void sync(ReplicationContext context)
+ {
+ boolean executeNow = false;
+ synchronized (replicationLock)
+ {
+ context.linedUp();
+ context.setSync(true);
+ if (pendingTokens.isEmpty())
+ {
+ // this means the list is empty and we should process it now
+ executeNow = true;
+ }
+ else
+ {
+ // adding the sync to be executed in order
+ // as soon as the reponses are back from the backup
+ this.pendingTokens.add(context);
+ }
+ }
+ if (executeNow)
+ {
+ context.replicated();
+ }
+ }
+
+
+ /**
+ * This method will first get all the sync tokens (that won't go to the backup node)
+ * Then it will get the round trip tokens.
+ * At last, if the list is empty, it will verify if there are any future tokens that are sync tokens, to avoid a case where no more replication is done due to inactivity.
+ * @return
+ */
+ private ArrayList<ReplicationContext> getTokens()
+ {
+ ArrayList<ReplicationContext> retList = new ArrayList<ReplicationContext>(1);
+
+ ReplicationContext tokenPolled = null;
+
+ // First will get all the non replicated tokens up to the first one that is not replicated
+ do
+ {
+ tokenPolled = pendingTokens.poll();
+
+ if (tokenPolled == null)
+ {
+ throw new IllegalStateException("Missing replication token on the queue.");
+ }
+
+ retList.add(tokenPolled);
+
+ }
+ while (tokenPolled.isSync());
+
+ // This is to avoid a situation where we won't have more replicated packets
+ // all the packets will need to be processed
+ synchronized (replicationLock)
+ {
+ while (!pendingTokens.isEmpty() && pendingTokens.peek().isSync())
+ {
+ tokenPolled = pendingTokens.poll();
+ if (!tokenPolled.isSync())
+ {
+ throw new IllegalStateException("Replicatoin context is not a roundtrip token as expected");
+ }
+
+ retList.add(tokenPolled);
+
+ }
+ }
+
+ return retList;
+ }
+
+
// Inner classes -------------------------------------------------
protected class ResponseHandler implements ChannelHandler
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-17 19:40:49 UTC (rev 8299)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-17 19:40:59 UTC (rev 8300)
@@ -539,10 +539,6 @@
{
replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
}
- else
- {
- manager.sync();
- }
manager.afterReplicated(new Runnable()
15 years, 1 month
JBoss hornetq SVN: r8299 - in trunk: src/main/org/hornetq/core/config/cluster and 5 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-17 14:40:49 -0500 (Tue, 17 Nov 2009)
New Revision: 8299
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-178
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-11-17 19:40:49 UTC (rev 8299)
@@ -594,6 +594,8 @@
catch (Exception ignore)
{
}
+
+ this.cancelPinger();
connector = null;
@@ -624,7 +626,7 @@
connection = null;
}
-
+
callFailureListeners(me, true);
if (connection == null)
@@ -658,7 +660,7 @@
final List<SessionFailureListener> listenersClone = new ArrayList<SessionFailureListener>(listeners);
for (final SessionFailureListener listener : listenersClone)
- {
+ {
try
{
if (afterReconnect)
@@ -685,18 +687,18 @@
*/
private void reconnectSessions(final RemotingConnection oldConnection, final int reconnectAttempts)
{
- RemotingConnection backupConnection = getConnectionWithRetry(reconnectAttempts);
+ RemotingConnection newConnection = getConnectionWithRetry(reconnectAttempts);
- if (backupConnection == null)
+ if (newConnection == null)
{
log.warn("Failed to connect to server.");
return;
}
-
+
List<FailureListener> oldListeners = oldConnection.getFailureListeners();
- List<FailureListener> newListeners = new ArrayList<FailureListener>(backupConnection.getFailureListeners());
+ List<FailureListener> newListeners = new ArrayList<FailureListener>(newConnection.getFailureListeners());
for (FailureListener listener : oldListeners)
{
@@ -708,11 +710,11 @@
}
}
- backupConnection.setFailureListeners(newListeners);
+ newConnection.setFailureListeners(newListeners);
for (ClientSessionInternal session : sessions)
{
- session.handleFailover(backupConnection);
+ session.handleFailover(newConnection);
}
}
@@ -775,22 +777,27 @@
}
}
}
+
+ private void cancelPinger()
+ {
+ if (pingerFuture != null)
+ {
+ pingRunnable.cancel();
+ pingerFuture.cancel(false);
+
+ pingRunnable = null;
+
+ pingerFuture = null;
+ }
+ }
+
private void checkCloseConnection()
{
if (connection != null && sessions.size() == 0)
{
- if (pingerFuture != null)
- {
- pingRunnable.cancel();
+ cancelPinger();
- pingerFuture.cancel(false);
-
- pingRunnable = null;
-
- pingerFuture = null;
- }
-
try
{
connection.destroy();
@@ -817,7 +824,7 @@
}
public RemotingConnection getConnection()
- {
+ {
if (connection == null)
{
Connection tc = null;
@@ -922,7 +929,7 @@
}
}
}
-
+
return connection;
}
@@ -1082,6 +1089,8 @@
final HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Did not receive data from server for " + connection.getTransportConnection());
+ cancelled = true;
+
threadPool.execute(new Runnable()
{
// Must be executed on different thread
@@ -1090,7 +1099,7 @@
connection.fail(me);
}
});
-
+
return;
}
else
Modified: trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java 2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java 2009-11-17 19:40:49 UTC (rev 8299)
@@ -55,7 +55,9 @@
private boolean useDuplicateDetection;
private int confirmationWindowSize;
-
+
+ private long clientFailureCheckPeriod;
+
public BridgeConfiguration(final String name,
final String queueName,
final String forwardingAddress,
@@ -67,6 +69,7 @@
final boolean failoverOnServerShutdown,
final boolean useDuplicateDetection,
final int confirmationWindowSize,
+ final long clientFailureCheckPeriod,
final Pair<String, String> connectorPair)
{
this.name = name;
@@ -80,6 +83,7 @@
this.failoverOnServerShutdown = failoverOnServerShutdown;
this.useDuplicateDetection = useDuplicateDetection;
this.confirmationWindowSize = confirmationWindowSize;
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
this.connectorPair = connectorPair;
this.discoveryGroupName = null;
}
@@ -95,6 +99,7 @@
final boolean failoverOnServerShutdown,
final boolean useDuplicateDetection,
final int confirmationWindowSize,
+ final long clientFailureCheckPeriod,
final String discoveryGroupName)
{
this.name = name;
@@ -108,6 +113,7 @@
this.failoverOnServerShutdown = failoverOnServerShutdown;
this.useDuplicateDetection = useDuplicateDetection;
this.confirmationWindowSize = confirmationWindowSize;
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
this.connectorPair = null;
this.discoveryGroupName = discoveryGroupName;
}
@@ -176,6 +182,11 @@
{
return confirmationWindowSize;
}
+
+ public long getClientFailureCheckPeriod()
+ {
+ return clientFailureCheckPeriod;
+ }
/**
* @param name the name to set
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-11-17 19:40:49 UTC (rev 8299)
@@ -681,6 +681,7 @@
failoverOnServerShutdown,
useDuplicateDetection,
confirmationWindowSize,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
}
else
@@ -696,6 +697,7 @@
failoverOnServerShutdown,
useDuplicateDetection,
confirmationWindowSize,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
discoveryGroupName);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-11-17 19:40:49 UTC (rev 8299)
@@ -17,6 +17,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
@@ -29,6 +31,7 @@
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -124,6 +127,7 @@
true,
false,
confirmationWindowSize,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -246,6 +250,7 @@
true,
false,
confirmationWindowSize,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -363,6 +368,7 @@
true,
false,
confirmationWindowSize,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -432,9 +438,21 @@
assertEquals(0, server0.getRemotingService().getConnections().size());
assertEquals(0, server1.getRemotingService().getConnections().size());
}
-
+
+ //We test that we can pause more than client failure check period (to prompt the pinger to failing)
+ //before reconnecting
+ public void testShutdownServerCleanlyAndReconnectSameNodeWithSleep() throws Exception
+ {
+ testShutdownServerCleanlyAndReconnectSameNode(true);
+ }
+
public void testShutdownServerCleanlyAndReconnectSameNode() throws Exception
{
+ testShutdownServerCleanlyAndReconnectSameNode(false);
+ }
+
+ private void testShutdownServerCleanlyAndReconnectSameNode(final boolean sleep) throws Exception
+ {
Map<String, Object> server0Params = new HashMap<String, Object>();
HornetQServer server0 = createHornetQServer(0, isNetty(), server0Params);
@@ -461,6 +479,7 @@
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = -1;
final int confirmationWindowSize = 1024;
+ final long clientFailureCheckPeriod = 1000;
Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
@@ -475,6 +494,7 @@
true,
false,
confirmationWindowSize,
+ clientFailureCheckPeriod,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -501,6 +521,12 @@
log.info("stopping server1");
server1.stop();
+
+ if (sleep)
+ {
+ Thread.sleep(2 * clientFailureCheckPeriod);
+ }
+
log.info("restarting server1");
server1.start();
log.info("server 1 restarted");
@@ -591,6 +617,7 @@
true,
false,
confirmationWindowSize,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-17 19:40:49 UTC (rev 8299)
@@ -110,6 +110,7 @@
true,
true,
1024,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -259,6 +260,7 @@
true,
true,
1024,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -455,6 +457,7 @@
false,
false,
1024,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -589,6 +592,7 @@
false,
true,
1024,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-17 19:40:49 UTC (rev 8299)
@@ -136,6 +136,7 @@
// Choose confirmation size to make sure acks
// are sent
numMessages * messageSize / 2,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -329,6 +330,7 @@
true,
false,
1024,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -497,6 +499,7 @@
true,
false,
1024,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -625,6 +628,7 @@
true,
false,
1024,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-11-17 19:40:49 UTC (rev 8299)
@@ -116,6 +116,7 @@
true,
true,
1024,
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
"dg1");
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2009-11-17 19:40:49 UTC (rev 8299)
@@ -38,10 +38,9 @@
*/
public class ReplicatedDistributionTest extends ClusterTestBase
{
-
// Constants -----------------------------------------------------
- static final SimpleString ADDRESS = new SimpleString("test.SomeAddress");
+ private static final SimpleString ADDRESS = new SimpleString("test.SomeAddress");
// Attributes ----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2009-11-17 19:40:49 UTC (rev 8299)
@@ -26,6 +26,7 @@
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.cluster.BridgeConfiguration;
@@ -163,6 +164,7 @@
randomBoolean(),
randomBoolean(),
randomPositiveInt(),
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
Configuration conf_1 = new ConfigurationImpl();
Modified: trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2009-11-17 19:40:49 UTC (rev 8299)
@@ -143,6 +143,7 @@
randomBoolean(),
randomBoolean(),
randomPositiveInt(),
+ ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
connectorPair);
Configuration conf_1 = new ConfigurationImpl();
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2009-11-17 19:38:24 UTC (rev 8298)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2009-11-17 19:40:49 UTC (rev 8299)
@@ -17,12 +17,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import junit.framework.TestSuite;
-
import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -38,6 +37,8 @@
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ReconnectTest.class);
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -46,19 +47,6 @@
// Public --------------------------------------------------------
- // This is a hack to remove this test from the testsuite
- // Remove this method to enable this Test on the testsuite.
- // You can still run tests individually on eclipse, but not on the testsuite
- public static TestSuite suite()
- {
- TestSuite suite = new TestSuite();
-
- // System.out -> JUnit report
- System.out.println("Test ReconnectTest being ignored for now!");
-
- return suite;
- }
-
public void testReconnectNetty() throws Exception
{
internalTestReconnect(true);
@@ -71,9 +59,8 @@
public void internalTestReconnect(final boolean isNetty) throws Exception
{
-
final int pingPeriod = 1000;
-
+
HornetQServer server = createServer(false, isNetty);
server.start();
@@ -97,7 +84,7 @@
final CountDownLatch latch = new CountDownLatch(1);
- session.getConnection().addFailureListener(new FailureListener()
+ session.addFailureListener(new SessionFailureListener()
{
public void connectionFailed(final HornetQException me)
@@ -106,12 +93,14 @@
latch.countDown();
}
+ public void beforeReconnect(HornetQException exception)
+ {
+ }
+
});
server.stop();
- // I couldn't find a way to install a latch here as I couldn't just use the FailureListener
- // as the FailureListener won't be informed until the reconnection process is done.
Thread.sleep((int)(pingPeriod * 2));
server.start();
15 years, 1 month
JBoss hornetq SVN: r8298 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-17 14:38:24 -0500 (Tue, 17 Nov 2009)
New Revision: 8298
Added:
branches/ClebertTemporary/
Log:
Creating a temporary play ground for a refactoring I'm making
Copied: branches/ClebertTemporary (from rev 8297, trunk)
15 years, 1 month
JBoss hornetq SVN: r8297 - trunk/tests/src/org/hornetq/tests/integration/replication.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-17 10:55:41 -0500 (Tue, 17 Nov 2009)
New Revision: 8297
Modified:
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
Log:
small fix for transaction ordering
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-17 10:54:10 UTC (rev 8296)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-17 15:55:41 UTC (rev 8297)
@@ -40,7 +40,7 @@
public class ReplicationOrderTest extends FailoverTestBase
{
- public static final int NUM = 100;
+ public static final int NUM = 300;
// Constants -----------------------------------------------------
@@ -92,12 +92,24 @@
}
session.createQueue(address, queue, true);
ClientProducer producer = session.createProducer(address);
+ boolean durable = true;
for (int i = 0; i < NUM; i++)
{
- boolean durable = (i % 2 == 0);
ClientMessage msg = session.createClientMessage(durable);
msg.putIntProperty("counter", i);
producer.send(msg);
+ if (transactional)
+ {
+ if (i % 10 == 0)
+ {
+ session.commit();
+ durable = !durable;
+ }
+ }
+ else
+ {
+ durable = !durable;
+ }
}
if (transactional)
{
15 years, 1 month
JBoss hornetq SVN: r8296 - trunk/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-11-17 05:54:10 -0500 (Tue, 17 Nov 2009)
New Revision: 8296
Modified:
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
only add the remote binding to the bindings map if it doesnt already exist
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-17 00:16:29 UTC (rev 8295)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-17 10:54:10 UTC (rev 8296)
@@ -673,7 +673,6 @@
bridge.getName(),
distance + 1);
- bindings.put(clusterName, binding);
if (postOffice.getBinding(clusterName) != null)
{
@@ -688,6 +687,8 @@
return;
}
+ bindings.put(clusterName, binding);
+
try
{
postOffice.addBinding(binding);
15 years, 1 month
JBoss hornetq SVN: r8295 - trunk/tests/src/org/hornetq/tests/integration/remoting.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-16 19:16:29 -0500 (Mon, 16 Nov 2009)
New Revision: 8295
Modified:
trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
Log:
tweaks on test
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2009-11-17 00:09:21 UTC (rev 8294)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2009-11-17 00:16:29 UTC (rev 8295)
@@ -14,6 +14,7 @@
package org.hornetq.tests.integration.remoting;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestSuite;
@@ -71,6 +72,8 @@
public void internalTestReconnect(final boolean isNetty) throws Exception
{
+ final int pingPeriod = 1000;
+
HornetQServer server = createServer(false, isNetty);
server.start();
@@ -82,7 +85,7 @@
ClientSessionFactory factory = createFactory(isNetty);
- factory.setClientFailureCheckPeriod(2000);
+ factory.setClientFailureCheckPeriod(pingPeriod); // Using a smaller timeout
factory.setRetryInterval(500);
factory.setRetryIntervalMultiplier(1d);
factory.setReconnectAttempts(-1);
@@ -109,13 +112,13 @@
// I couldn't find a way to install a latch here as I couldn't just use the FailureListener
// as the FailureListener won't be informed until the reconnection process is done.
- Thread.sleep(2100);
+ Thread.sleep((int)(pingPeriod * 2));
server.start();
- latch.await();
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
- // Some process to let the Failure listener loop occur
+ // Some time to let possible loops to occur
Thread.sleep(500);
assertEquals(1, count.get());
15 years, 1 month
JBoss hornetq SVN: r8294 - trunk/tests/src/org/hornetq/tests/integration/remoting.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-16 19:09:21 -0500 (Mon, 16 Nov 2009)
New Revision: 8294
Modified:
trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
Log:
just tweaks / formatting
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2009-11-17 00:08:14 UTC (rev 8293)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2009-11-17 00:09:21 UTC (rev 8294)
@@ -44,33 +44,31 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
-
+
// This is a hack to remove this test from the testsuite
// Remove this method to enable this Test on the testsuite.
// You can still run tests individually on eclipse, but not on the testsuite
public static TestSuite suite()
{
TestSuite suite = new TestSuite();
-
+
// System.out -> JUnit report
System.out.println("Test ReconnectTest being ignored for now!");
-
+
return suite;
}
-
-
+
public void testReconnectNetty() throws Exception
{
internalTestReconnect(true);
}
-
+
public void testReconnectInVM() throws Exception
{
internalTestReconnect(false);
}
- public void internalTestReconnect(boolean isNetty) throws Exception
+ public void internalTestReconnect(final boolean isNetty) throws Exception
{
HornetQServer server = createServer(false, isNetty);
@@ -78,7 +76,7 @@
server.start();
ClientSessionInternal session = null;
-
+
try
{
@@ -93,35 +91,35 @@
session = (ClientSessionInternal)factory.createSession();
final AtomicInteger count = new AtomicInteger(0);
-
+
final CountDownLatch latch = new CountDownLatch(1);
-
+
session.getConnection().addFailureListener(new FailureListener()
{
- public void connectionFailed(HornetQException me)
+ public void connectionFailed(final HornetQException me)
{
count.incrementAndGet();
latch.countDown();
}
-
+
});
-
+
server.stop();
-
+
// I couldn't find a way to install a latch here as I couldn't just use the FailureListener
// as the FailureListener won't be informed until the reconnection process is done.
Thread.sleep(2100);
-
+
server.start();
-
+
latch.await();
// Some process to let the Failure listener loop occur
Thread.sleep(500);
-
+
assertEquals(1, count.get());
-
+
}
finally
{
@@ -132,7 +130,7 @@
catch (Throwable e)
{
}
-
+
server.stop();
}
15 years, 1 month
JBoss hornetq SVN: r8293 - trunk/tests/src/org/hornetq/tests/integration/remoting.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-16 19:08:14 -0500 (Mon, 16 Nov 2009)
New Revision: 8293
Added:
trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-178 - Adding test replicating the failure
Added: trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/ReconnectTest.java 2009-11-17 00:08:14 UTC (rev 8293)
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.remoting;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.TestSuite;
+
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A ReconnectSimpleTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReconnectTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ // This is a hack to remove this test from the testsuite
+ // Remove this method to enable this Test on the testsuite.
+ // You can still run tests individually on eclipse, but not on the testsuite
+ public static TestSuite suite()
+ {
+ TestSuite suite = new TestSuite();
+
+ // System.out -> JUnit report
+ System.out.println("Test ReconnectTest being ignored for now!");
+
+ return suite;
+ }
+
+
+ public void testReconnectNetty() throws Exception
+ {
+ internalTestReconnect(true);
+ }
+
+ public void testReconnectInVM() throws Exception
+ {
+ internalTestReconnect(false);
+ }
+
+ public void internalTestReconnect(boolean isNetty) throws Exception
+ {
+
+ HornetQServer server = createServer(false, isNetty);
+
+ server.start();
+
+ ClientSessionInternal session = null;
+
+ try
+ {
+
+ ClientSessionFactory factory = createFactory(isNetty);
+
+ factory.setClientFailureCheckPeriod(2000);
+ factory.setRetryInterval(500);
+ factory.setRetryIntervalMultiplier(1d);
+ factory.setReconnectAttempts(-1);
+ factory.setConfirmationWindowSize(1024 * 1024);
+
+ session = (ClientSessionInternal)factory.createSession();
+
+ final AtomicInteger count = new AtomicInteger(0);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ session.getConnection().addFailureListener(new FailureListener()
+ {
+
+ public void connectionFailed(HornetQException me)
+ {
+ count.incrementAndGet();
+ latch.countDown();
+ }
+
+ });
+
+ server.stop();
+
+ // I couldn't find a way to install a latch here as I couldn't just use the FailureListener
+ // as the FailureListener won't be informed until the reconnection process is done.
+ Thread.sleep(2100);
+
+ server.start();
+
+ latch.await();
+
+ // Some process to let the Failure listener loop occur
+ Thread.sleep(500);
+
+ assertEquals(1, count.get());
+
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ }
+
+ server.stop();
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
15 years, 1 month
JBoss hornetq SVN: r8292 - trunk/tests/src/org/hornetq/tests/integration/replication.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-16 10:43:05 -0500 (Mon, 16 Nov 2009)
New Revision: 8292
Modified:
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
Log:
HORNETQ-218: Incorrect order when persistent and non-persistent messages are sent over replication
* added ReplicationOrderTest for transaction case
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-16 15:07:00 UTC (rev 8291)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-16 15:43:05 UTC (rev 8292)
@@ -57,7 +57,7 @@
for (int i = 0; i < 100; i++)
{
System.out.println("<<<<<< " + i + " >>>>>>>");
- testMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup();
+ testTxMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup();
tearDown();
setUp();
}
@@ -65,13 +65,31 @@
public void testMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup() throws Exception
{
+ doTestMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup(false);
+ }
+
+ public void testTxMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup() throws Exception
+ {
+ doTestMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup(true);
+ }
+
+ private void doTestMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup(boolean transactional) throws Exception
+ {
String address = randomString();
String queue = randomString();
ClientSessionFactory csf = new ClientSessionFactoryImpl(getConnectorTransportConfiguration(true));
csf.setBlockOnNonPersistentSend(false);
csf.setBlockOnPersistentSend(false);
- ClientSession session = csf.createSession(true, true);
+ ClientSession session = null;
+ if (transactional)
+ {
+ session = csf.createSession(false, false);
+ }
+ else
+ {
+ session = csf.createSession(true, true);
+ }
session.createQueue(address, queue, true);
ClientProducer producer = session.createProducer(address);
for (int i = 0; i < NUM; i++)
@@ -81,6 +99,10 @@
msg.putIntProperty("counter", i);
producer.send(msg);
}
+ if (transactional)
+ {
+ session.commit();
+ }
session.close();
csf = new ClientSessionFactoryImpl(getConnectorTransportConfiguration(true));
@@ -93,10 +115,10 @@
assertNotNull(message);
assertEquals(i, message.getIntProperty("counter").intValue());
}
-
+
consumer.close();
session.deleteQueue(queue);
-
+
session.close();
}
15 years, 1 month