Author: clebert.suconic(a)jboss.com
Date: 2011-06-16 01:25:26 -0400 (Thu, 16 Jun 2011)
New Revision: 10815
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
Log:
Some cleanup on bridges
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2011-06-16
05:23:43 UTC (rev 10814)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2011-06-16
05:25:26 UTC (rev 10815)
@@ -133,10 +133,16 @@
int ackBatchSize) throws HornetQException;
void close();
+
+ /**
+ * Opposed to close, will call cleanup only on every created session and children
objects.
+ */
+ void cleanup();
ServerLocator getServerLocator();
CoreRemotingConnection getConnection();
boolean isClosed();
+
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-16
05:23:43 UTC (rev 10814)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-16
05:25:26 UTC (rev 10815)
@@ -129,7 +129,7 @@
private final long maxRetryInterval;
- private final int reconnectAttempts;
+ private int reconnectAttempts;
private final Set<SessionFailureListener> listeners = new
ConcurrentHashSet<SessionFailureListener>();
@@ -450,7 +450,45 @@
closed = true;
}
+
+ public void cleanup()
+ {
+ if (closed)
+ {
+ return;
+ }
+ // we need to stop the factory from connecting if it is in the middle of trying to
failover before we get the lock
+ causeExit();
+ synchronized (createSessionLock)
+ {
+ synchronized (failoverLock)
+ {
+ HashSet<ClientSessionInternal> sessionsToClose;
+ synchronized (sessions)
+ {
+ sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+ }
+ // work on a copied set. the session will be removed from sessions when
session.close() is called
+ for (ClientSessionInternal session : sessionsToClose)
+ {
+ try
+ {
+ session.cleanUp(false);
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to close session", e);
+ }
+ }
+
+ checkCloseConnection();
+ }
+ }
+
+ closed = true;
+ }
+
public boolean isClosed()
{
return closed;
@@ -1447,4 +1485,13 @@
cancelled = true;
}
}
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.client.impl.ClientSessionFactoryInternal#setReconnectAttempts(int)
+ */
+ public void setReconnectAttempts(int attempts)
+ {
+ this.reconnectAttempts = attempts;
+ }
+
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2011-06-16
05:23:43 UTC (rev 10814)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2011-06-16
05:25:26 UTC (rev 10815)
@@ -46,4 +46,6 @@
void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp);
Object getBackupConnector();
+
+ void setReconnectAttempts(int i);
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-16
05:23:43 UTC (rev 10814)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-16
05:25:26 UTC (rev 10815)
@@ -1457,7 +1457,7 @@
{
try
{
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ factory.connect(initialConnectAttempts, failoverOnInitialConnection);
}
catch (HornetQException e)
{
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-16
05:23:43 UTC (rev 10814)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-16
05:25:26 UTC (rev 10815)
@@ -18,6 +18,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
@@ -28,6 +30,7 @@
import org.hornetq.api.core.client.SendAcknowledgementHandler;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.filter.Filter;
@@ -53,6 +56,7 @@
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author Clebert Suconic
*
* Created 12 Nov 2008 11:37:35
*
@@ -64,13 +68,13 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(BridgeImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
// Attributes ----------------------------------------------------
-
+
private static final SimpleString JMS_QUEUE_ADDRESS_PREFIX = new
SimpleString("jms.queue.");
-
+
private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new
SimpleString("jms.topic.");
protected final ServerLocatorInternal serverLocator;
@@ -83,6 +87,11 @@
protected final Executor executor;
+ protected final ScheduledExecutorService scheduledExecutor;
+
+ /** Used when there's a scheduled reconnection */
+ protected ScheduledFuture<?> futureScheduledReconnection;
+
private final Filter filter;
private final SimpleString forwardingAddress;
@@ -102,7 +111,7 @@
private final boolean useDuplicateDetection;
private volatile boolean active;
-
+
private volatile boolean stopping;
private final String user;
@@ -111,6 +120,18 @@
private boolean activated;
+ private final int reconnectAttempts;
+
+ private int reconnectAttemptsInUse;
+
+ private final long retryInterval;
+
+ private final double retryMultiplier;
+
+ private final long maxRetryInterval;
+
+ private int retryCount = 0;
+
private NotificationService notificationService;
// Static --------------------------------------------------------
@@ -120,6 +141,10 @@
// Public --------------------------------------------------------
public BridgeImpl(final ServerLocatorInternal serverLocator,
+ final int reconnectAttempts,
+ final long retryInterval,
+ final double retryMultiplier,
+ final long maxRetryInterval,
final UUID nodeUUID,
final SimpleString name,
final Queue queue,
@@ -134,6 +159,17 @@
final boolean activated,
final StorageManager storageManager) throws Exception
{
+
+ this.reconnectAttempts = reconnectAttempts;
+
+ this.reconnectAttemptsInUse = -1;
+
+ this.retryInterval = retryInterval;
+
+ this.retryMultiplier = retryMultiplier;
+
+ this.maxRetryInterval = maxRetryInterval;
+
this.serverLocator = serverLocator;
this.nodeUUID = nodeUUID;
@@ -144,6 +180,8 @@
this.executor = executor;
+ this.scheduledExecutor = scheduledExecutor;
+
filter = FilterImpl.createFilter(filterString);
this.forwardingAddress = forwardingAddress;
@@ -187,7 +225,7 @@
}
}
- private void cancelRefs() throws Exception
+ private void cancelRefs()
{
MessageReference ref;
@@ -201,25 +239,32 @@
}
list.addFirst(ref);
}
-
+
if (isTrace && list.isEmpty())
{
- log.trace("didn't have any references to cancel on bridge " +
this);
+ log.trace("didn't have any references to cancel on bridge " +
this);
}
- Queue queue = null;
-
+ Queue refqueue = null;
+
long timeBase = System.currentTimeMillis();
for (MessageReference ref2 : list)
{
- queue = ref2.getQueue();
+ refqueue = ref2.getQueue();
- queue.cancel(ref2, timeBase);
+ try
+ {
+ refqueue.cancel(ref2, timeBase);
+ }
+ catch (Exception e)
+ {
+ // There isn't much we can do besides log an error
+ log.error("Couldn't cancel reference " + ref2, e);
+ }
}
+ }
- }
-
public void flushExecutor()
{
// Wait for any create objects runnable to complete
@@ -235,18 +280,22 @@
}
}
-
public void stop() throws Exception
{
- if (log.isDebugEnabled())
- {
- log.debug("Bridge " + this.name + " being stopped");
- }
+ if (log.isDebugEnabled())
+ {
+ log.debug("Bridge " + this.name + " being stopped");
+ }
+
+ stopping = true;
- stopping = true;
+ if (futureScheduledReconnection != null)
+ {
+ futureScheduledReconnection.cancel(true);
+ }
executor.execute(new StopRunnable());
-
+
if (notificationService != null)
{
TypedProperties props = new TypedProperties();
@@ -265,10 +314,10 @@
public void pause() throws Exception
{
- if (log.isDebugEnabled())
- {
- log.debug("Bridge " + this.name + " being paused");
- }
+ if (log.isDebugEnabled())
+ {
+ log.debug("Bridge " + this.name + " being paused");
+ }
executor.execute(new PauseRunnable());
@@ -288,13 +337,13 @@
}
}
- public void resume() throws Exception
- {
- queue.addConsumer(BridgeImpl.this);
- queue.deliverAsync();
- }
+ public void resume() throws Exception
+ {
+ queue.addConsumer(BridgeImpl.this);
+ queue.deliverAsync();
+ }
- public boolean isStarted()
+ public boolean isStarted()
{
return started;
}
@@ -303,7 +352,7 @@
{
activated = true;
- executor.execute(new CreateObjectsRunnable());
+ executor.execute(new ConnectRunnable());
}
public SimpleString getName()
@@ -377,7 +426,7 @@
{
// We keep our own DuplicateID for the Bridge, so bouncing back and forths will
work fine
byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
-
+
message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
}
@@ -396,13 +445,13 @@
public static byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID)
{
byte[] bytes = new byte[24];
-
+
ByteBuffer bb = ByteBuffer.wrap(bytes);
-
+
bb.put(nodeUUID.asBytes());
-
+
bb.putLong(messageID);
-
+
return bytes;
}
@@ -421,16 +470,16 @@
return HandleStatus.BUSY;
}
- if (isTrace)
- {
- log.trace("Bridge " + name + " is handling reference=" +
ref);
- }
+ if (isTrace)
+ {
+ log.trace("Bridge " + name + " is handling reference=" +
ref);
+ }
ref.handled();
ServerMessage message = ref.getMessage();
refs.add(ref);
-
+
message = beforeForward(message);
SimpleString dest;
@@ -444,10 +493,10 @@
// Preserve the original address
dest = message.getAddress();
}
- //if we failover during send then there is a chance that the
- //that this will throw a disconnect, we need to remove the message
- //from the acks so it will get resent, duplicate detection will cope
- //with any messages resent
+ // if we failover during send then there is a chance that the
+ // that this will throw a disconnect, we need to remove the message
+ // from the acks so it will get resent, duplicate detection will cope
+ // with any messages resent
try
{
producer.send(dest, message);
@@ -467,14 +516,34 @@
// FailureListener implementation --------------------------------
- public void connectionFailed(final HornetQException me, boolean failedOver)
+ public final void connectionFailed(final HornetQException me, boolean failedOver)
{
log.warn(name + "::Connection failed with failedOver=" + failedOver,
me);
if (isTrace)
{
- log.trace("Calling BridgeImpl::connectionFailed(HOrnetQException me="
+ me + ", boolean failedOver=" + failedOver);
+ log.trace("Calling BridgeImpl::connectionFailed(HOrnetQException me="
+ me +
+ ", boolean failedOver=" +
+ failedOver);
}
+ try
+ {
+ csf.cleanup();
+ }
+ catch (Throwable dontCare)
+ {
+ }
+
+ try
+ {
+ session.cleanUp(false);
+ }
+ catch (Throwable dontCare)
+ {
+ }
+
fail(false);
+
+ scheduleRetryConnect();
}
public void beforeReconnect(final HornetQException exception)
@@ -482,8 +551,6 @@
log.warn(name + "::Connection failed before reconnect ", exception);
fail(true);
}
-
-
// Package protected ---------------------------------------------
@@ -497,8 +564,8 @@
@Override
public String toString()
{
- return this.getClass().getName() +
- " [name=" + name +
+ return this.getClass().getName() + " [name=" +
+ name +
", nodeUUID=" +
nodeUUID +
", queue=" +
@@ -516,212 +583,199 @@
"]";
}
- private void fail(final boolean beforeReconnect)
+ protected void fail(final boolean permanently)
{
- // This will get called even after the bridge reconnects - in this case
- // we want to cancel all unacked refs so they get resent
- // duplicate detection will ensure no dups are routed on the other side
+ log.debug(name + "::BridgeImpl::fail being called, permanently=" +
permanently);
- log.debug(name + "::BridgeImpl::fail being called, beforeReconnect=" +
beforeReconnect);
+ if (queue != null)
+ {
+ try
+ {
+ queue.removeConsumer(this);
+ }
+ catch (Exception dontcare)
+ {
+ log.debug(dontcare);
+ }
+ }
- if (session.getConnection().isDestroyed())
+ cancelRefs();
+ if (queue != null)
{
- log.debug(name + "::Connection is destroyed, active = false now");
- active = false;
+ queue.deliverAsync();
}
-
-
- if (!session.getConnection().isDestroyed())
- {
- if (beforeReconnect)
- {
- try {
- log.debug(name + "::Connection is destroyed, active = false
now");
-
- cancelRefs();
- }
- catch (Exception e)
- {
- BridgeImpl.log.error("Failed to cancel refs", e);
- }
- }
- else
- {
- try
- {
- afterConnect();
-
- log.debug(name + "::After reconnect, setting active=true
now");
- active = true;
-
- if (queue != null)
- {
- queue.deliverAsync();
- }
- }
- catch (Exception e)
- {
- BridgeImpl.log.error("Failed to call after connect", e);
- }
- }
- }
}
/* Hook for doing extra stuff after connection */
protected void afterConnect() throws Exception
{
- //NOOP
+ retryCount = 0;
+ reconnectAttemptsInUse = reconnectAttempts;
}
/* Hook for creating session factory */
- protected ClientSessionFactory createSessionFactory() throws Exception
+ protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
- return serverLocator.createSessionFactory();
+ ClientSessionFactoryInternal csf =
(ClientSessionFactoryInternal)serverLocator.createSessionFactory();
+ // csf.setReconnectAttempts(0);
+ //csf.setInitialReconnectAttempts(1);
+ return csf;
}
/* This is called only when the bridge is activated */
- protected synchronized boolean createObjects()
+ protected void connect()
{
- if (!started)
- {
- return false;
- }
+ BridgeImpl.log.info("Connecting bridge " + name + " to its
destination [" + nodeUUID.toString() + "], csf=" + this.csf);
- boolean retry = false;
- int retryCount = 0;
+ retryCount++;
- do
+ try
{
- BridgeImpl.log.info("Connecting bridge " + name + " to its
destination [" + nodeUUID.toString() + "]");
+ if (csf == null || csf.isClosed())
+ {
+ csf = createSessionFactory();
+ // Session is pre-acknowledge
+ session = (ClientSessionInternal)csf.createSession(user, password, false,
true, true, true, 1);
+ try
+ {
+ session.addMetaData("Session-for-bridge", name.toString());
+ session.addMetaData("nodeUUID", nodeUUID.toString());
+ }
+ catch (Throwable dontCare)
+ {
+ // addMetaData here is just for debug purposes
+ }
+ }
- try
+ if (forwardingAddress != null)
{
- if (csf == null || csf.isClosed())
+ BindingQuery query = null;
+
+ try
{
- csf = createSessionFactory();
- // Session is pre-acknowledge
- session = (ClientSessionInternal)csf.createSession(user, password, false,
true, true, true, 1);
- try
- {
- session.addMetaData("Session-for-bridge", name.toString());
- session.addMetaData("nodeUUID", nodeUUID.toString());
- }
- catch (Throwable dontCare)
- {
- // addMetaData here is just for debug purposes
- }
+ query = session.bindingQuery(forwardingAddress);
}
+ catch (Throwable e)
+ {
+ log.warn("Error on querying binding on bridge " + this.name +
". Retrying in 100 milliseconds", e);
+ // This was an issue during startup, we will not count this retry
+ retryCount--;
- if (forwardingAddress != null)
+ scheduleRetryConnectFixedTimeout(100);
+ return;
+ }
+
+ if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) ||
forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX))
{
- BindingQuery query = null;
-
- try
+ if (!query.isExists())
{
- query = session.bindingQuery(forwardingAddress);
+ log.warn("Address " + forwardingAddress +
+ " doesn't have any bindings yet, retry #(" +
+ retryCount +
+ ")");
+ scheduleRetryConnect();
+ return;
}
- catch (Throwable e)
+ }
+ else
+ {
+ if (!query.isExists())
{
- log.warn("Error on querying binding. Retrying", e);
- retry = true;
- Thread.sleep(100);
- continue;
+ log.info("Bridge " + this.getName() +
+ " connected to fowardingAddress=" +
+ this.getForwardingAddress() +
+ ". " +
+ getForwardingAddress() +
+ " doesn't have any bindings what means messages will
be ignored until a binding is created.");
}
-
- if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) ||
forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX))
- {
- if (!query.isExists())
- {
- retryCount ++;
- if (serverLocator.getReconnectAttempts() > 0)
- {
- if (retryCount > serverLocator.getReconnectAttempts())
- {
- log.warn("Retried " + forwardingAddress + " up
to the configured reconnectAttempts(" + serverLocator.getReconnectAttempts() +
"). Giving up now. The bridge " + this.getName() + " will not be
activated");
- return false;
- }
- }
-
- log.warn("Address " + forwardingAddress + "
doesn't have any bindings yet, retry #(" + retryCount + ")");
- Thread.sleep(serverLocator.getRetryInterval());
- retry = true;
- csf.close();
- session.close();
- continue;
- }
- }
- else
- {
- if (!query.isExists())
- {
- log.info("Bridge " + this.getName() + " connected to
fowardingAddress=" + this.getForwardingAddress() + ". " +
getForwardingAddress() + " doesn't have any bindings what means messages will be
ignored until a binding is created.");
- }
- }
}
+ }
- if (session == null)
- {
- // This can happen if the bridge is shutdown
- return false;
- }
+ producer = session.createProducer();
+ session.addFailureListener(BridgeImpl.this);
+ session.setSendAcknowledgementHandler(BridgeImpl.this);
- producer = session.createProducer();
- session.addFailureListener(BridgeImpl.this);
- session.setSendAcknowledgementHandler(BridgeImpl.this);
+ afterConnect();
- afterConnect();
+ active = true;
- active = true;
+ queue.addConsumer(BridgeImpl.this);
+ queue.deliverAsync();
- queue.addConsumer(BridgeImpl.this);
- queue.deliverAsync();
+ BridgeImpl.log.info("Bridge " + name + " is connected [" +
nodeUUID + "-> " + name + "]");
- BridgeImpl.log.info("Bridge " + name + " is connected ["
+ nodeUUID + "-> " + name +"]");
+ return;
+ }
+ catch (HornetQException e)
+ {
+ // the session was created while its server was starting, retry it:
+ if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+ {
+ BridgeImpl.log.warn("Server is starting, retry to create the session for
bridge " + name);
- return true;
+ // We are not going to count this one as a retry
+ retryCount--;
+ scheduleRetryConnectFixedTimeout(100);
+ return;
}
- catch (HornetQException e)
+ else
{
- if (csf != null)
- {
- csf.close();
- }
+ BridgeImpl.log.warn("Bridge " + name + " is unable to connect
to destination. Retrying", e);
+ }
+ }
+ catch (Exception e)
+ {
+ BridgeImpl.log.warn("Bridge " + name + " is unable to connect to
destination. It will be disabled.", e);
+ }
- // the session was created while its server was starting, retry it:
- if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
- {
- BridgeImpl.log.warn("Server is starting, retry to create the session
for bridge " + name);
+ scheduleRetryConnect();
- // Sleep a little to prevent spinning too much
- try
- {
- Thread.sleep(10);
- }
- catch (InterruptedException ignore)
- {
- }
+ }
- retry = true;
+ protected void scheduleRetryConnect()
+ {
+ if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttempts)
+ {
+ log.warn("Bridge " + this.name +
+ " achieved " +
+ retryCount +
+ " maxattempts=" +
+ reconnectAttempts +
+ " it will stop retrying to reconnect");
+ fail(true);
+ return;
+ }
- continue;
- }
- else
- {
- BridgeImpl.log.warn("Bridge " + name + " is unable to
connect to destination. It will be disabled.", e);
+ long timeout = (long)(this.retryCount * this.retryMultiplier *
this.retryMultiplier);
+ if (timeout == 0)
+ {
+ timeout = this.retryInterval;
+ }
+ if (timeout > maxRetryInterval)
+ {
+ timeout = maxRetryInterval;
+ }
- return false;
- }
+ scheduleRetryConnectFixedTimeout(timeout);
+ }
+
+ protected void scheduleRetryConnectFixedTimeout(final long milliseconds)
+ {
+ if (csf != null)
+ {
+ try
+ {
+ csf.cleanup();
}
- catch (Exception e)
+ catch (Throwable ignored)
{
- BridgeImpl.log.warn("Bridge " + name + " is unable to connect
to destination. It will be disabled.", e);
-
- return false;
}
}
- while (retry && !stopping);
- return false;
+ csf = null;
+ session = null;
+
+ futureScheduledReconnection = scheduledExecutor.schedule(new
FutureConnectRunnable(), milliseconds, TimeUnit.MILLISECONDS);
}
// Inner classes -------------------------------------------------
@@ -732,14 +786,6 @@
{
try
{
- // We need to close the session outside of the lock,
- // so any pending operation will be canceled right away
-
- // TODO: Why closing the CSF will make a few clustering and failover tests to
- // either deadlock or take forever on waiting
- // locks
- csf.close();
- csf = null;
if (session != null)
{
log.debug("Cleaning up session " + session);
@@ -747,6 +793,11 @@
session.removeFailureListener(BridgeImpl.this);
}
+ if (csf != null)
+ {
+ csf.close();
+ }
+
synchronized (BridgeImpl.this)
{
log.debug("Closing Session for bridge " +
BridgeImpl.this.name);
@@ -759,13 +810,6 @@
queue.removeConsumer(BridgeImpl.this);
- cancelRefs();
-
- if (queue != null)
- {
- queue.deliverAsync();
- }
-
log.info("stopped bridge " + name);
}
catch (Exception e)
@@ -783,23 +827,15 @@
{
synchronized (BridgeImpl.this)
{
- log.debug("Closing Session for bridge " +
BridgeImpl.this.name);
-
started = false;
active = false;
-
}
queue.removeConsumer(BridgeImpl.this);
- cancelRefs();
+ internalCancelReferences();
- if (queue != null)
- {
- queue.deliverAsync();
- }
-
log.info("paused bridge " + name);
}
catch (Exception e)
@@ -807,18 +843,33 @@
BridgeImpl.log.error("Failed to pause bridge", e);
}
}
+
}
- private class CreateObjectsRunnable implements Runnable
+ private void internalCancelReferences()
{
- public synchronized void run()
+ cancelRefs();
+
+ if (queue != null)
{
- if (!createObjects())
- {
- active = false;
+ queue.deliverAsync();
+ }
+ }
- started = false;
- }
+ // The scheduling will still use the main executor here
+ private class FutureConnectRunnable implements Runnable
+ {
+ public void run()
+ {
+ executor.execute(new ConnectRunnable());
}
}
+
+ private class ConnectRunnable implements Runnable
+ {
+ public synchronized void run()
+ {
+ connect();
+ }
+ }
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-16
05:23:43 UTC (rev 10814)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-16
05:25:26 UTC (rev 10815)
@@ -44,6 +44,7 @@
* A ClusterConnectionBridge
*
* @author tim
+ * @author Clebert Suconic
*
*
*/
@@ -68,6 +69,10 @@
private final String targetNodeID;
public ClusterConnectionBridge(final ServerLocatorInternal serverLocator,
+ final int reconnectAttempts,
+ final long retryInterval,
+ final double retryMultiplier,
+ final long maxRetryInterval,
final UUID nodeUUID,
final String targetNodeID,
final SimpleString name,
@@ -88,6 +93,10 @@
final TransportConfiguration connector) throws
Exception
{
super(serverLocator,
+ reconnectAttempts,
+ retryInterval,
+ retryMultiplier,
+ maxRetryInterval,
nodeUUID,
name,
queue,
@@ -231,34 +240,15 @@
super.stop();
}
- @Override
- protected ClientSessionFactory createSessionFactory() throws Exception
+ protected void failed(final boolean permanently)
{
- //We create the session factory using the specified connector
+ super.fail(permanently);
- return serverLocator.createSessionFactory(connector);
- }
-
- @Override
- public void connectionFailed(HornetQException me, boolean failedOver)
- {
- if (isTrace)
- {
- log.trace("Connection Failed on ClusterConnectionBridge, failedOver = " +
failedOver + ", sessionClosed = " + session.isClosed(), new Exception
("trace"));
- }
-
- if (!failedOver && !session.isClosed())
+ if (permanently)
{
- try
- {
- session.cleanUp(true);
- }
- catch (Exception e)
- {
- log.warn("Unable to clean up the session after a connection
failure", e);
- }
+ log.debug("cluster node for bridge " + this.getName() + " is
permanently down");
serverLocator.notifyNodeDown(targetNodeID);
}
- super.connectionFailed(me, failedOver);
+
}
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-16
05:23:43 UTC (rev 10814)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-16
05:25:26 UTC (rev 10815)
@@ -397,12 +397,17 @@
{
serverLocator.setNodeID(nodeUUID.toString());
- serverLocator.setReconnectAttempts(reconnectAttempts);
+ serverLocator.setReconnectAttempts(0);
serverLocator.setClusterConnection(true);
serverLocator.setClusterTransportConfiguration(connector);
serverLocator.setBackup(server.getConfiguration().isBackup());
serverLocator.setInitialConnectAttempts(-1);
+// serverLocator.setInitialConnectAttempts(1);
+
+ serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+ serverLocator.setConnectionTTL(connectionTTL);
+
if (serverLocator.getConfirmationWindowSize() < 0)
{
// We can't have confirmationSize = -1 on the cluster Bridge
@@ -466,7 +471,7 @@
{
log.trace("Closing clustering record " + record);
}
- record.pause();
+ record.close();
}
catch (Exception e)
{
@@ -481,6 +486,10 @@
final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
final boolean last)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("node " + nodeID + " connectionPair = " +
connectorPair + " is up");
+ }
// discard notifications about ourselves unless its from our backup
if (nodeID.equals(nodeUUID.toString()))
@@ -677,24 +686,28 @@
protected Bridge createBridge(MessageFlowRecordImpl record) throws Exception
{
ClusterConnectionBridge bridge = new ClusterConnectionBridge(serverLocator,
- nodeUUID,
- record.getNodeID(),
- record.getQueueName(),
- record.getQueue(),
- executorFactory.getExecutor(),
- null,
- null,
- scheduledExecutor,
- null,
- useDuplicateDetection,
- clusterUser,
- clusterPassword,
- !backup,
- server.getStorageManager(),
-
managementService.getManagementAddress(),
-
managementService.getManagementNotificationAddress(),
- record,
- record.getConnector());
+ reconnectAttempts,
+ retryInterval,
+
retryIntervalMultiplier,
+ maxRetryInterval,
+ nodeUUID,
+ record.getNodeID(),
+
record.getQueueName(),
+ record.getQueue(),
+
executorFactory.getExecutor(),
+ null,
+ null,
+ scheduledExecutor,
+ null,
+
useDuplicateDetection,
+ clusterUser,
+ clusterPassword,
+ !backup,
+
server.getStorageManager(),
+
managementService.getManagementAddress(),
+
managementService.getManagementNotificationAddress(),
+ record,
+
record.getConnector());
return bridge;
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-16
05:23:43 UTC (rev 10814)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-16
05:25:26 UTC (rev 10815)
@@ -687,12 +687,14 @@
}
serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());
- serverLocator.setReconnectAttempts(config.getReconnectAttempts());
- serverLocator.setRetryInterval(config.getRetryInterval());
- serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
- serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
+
+ // We are going to manually retry on the bridge in case of failure
+ serverLocator.setReconnectAttempts(0);
+ serverLocator.setInitialConnectAttempts(1);
+
+
+
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
- serverLocator.setInitialConnectAttempts(config.getReconnectAttempts());
serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
if (!config.isUseDuplicateDetection())
@@ -702,6 +704,10 @@
}
clusterLocators.add(serverLocator);
Bridge bridge = new BridgeImpl(serverLocator,
+ config.getReconnectAttempts(),
+ config.getRetryInterval(),
+ config.getRetryIntervalMultiplier(),
+ config.getMaxRetryInterval(),
nodeUUID,
new SimpleString(config.getName()),
queue,
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-16
05:23:43 UTC (rev 10814)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-16
05:25:26 UTC (rev 10815)
@@ -641,7 +641,7 @@
{
log.trace("Sending Notification = " + notification +
", notificationEnabled=" + notificationsEnabled +
- " messagingServerControl=" + messagingServerControl, new
Exception ("trace"));
+ " messagingServerControl=" + messagingServerControl);
}
if (messagingServerControl != null && notificationsEnabled)
{