JBoss hornetq SVN: r10821 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-16 21:48:36 -0400 (Thu, 16 Jun 2011)
New Revision: 10821
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
tracing
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-17 01:48:12 UTC (rev 10820)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-17 01:48:36 UTC (rev 10821)
@@ -1461,6 +1461,7 @@
}
catch (HornetQException e)
{
+ log.debug("Exception on establish connector initial connection", e);
if (!interrupted)
{
this.e = e;
@@ -1500,6 +1501,18 @@
factory = null;
}
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "Connector [initialConnector=" + initialConnector + "]";
+ }
+
+
+
}
}
}
13 years, 6 months
JBoss hornetq SVN: r10820 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-16 21:48:12 -0400 (Thu, 16 Jun 2011)
New Revision: 10820
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
tracing
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-17 00:31:24 UTC (rev 10819)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-17 01:48:12 UTC (rev 10820)
@@ -85,6 +85,8 @@
private static final Logger log = Logger.getLogger(ClientSessionFactoryImpl.class);
private static final boolean isTrace = log.isTraceEnabled();
+
+ private static final boolean isDebug = log.isDebugEnabled();
// Attributes
// -----------------------------------------------------------------------------------
@@ -234,7 +236,7 @@
{
if(live.equals(connectorConfig) && backUp != null)
{
- if (log.isDebugEnabled())
+ if (isDebug)
{
log.debug("Setting up backup config = " + backUp + " for live = " + live);
}
@@ -242,7 +244,7 @@
}
else
{
- if (log.isDebugEnabled())
+ if (isDebug)
{
log.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live + " / " + backUp + " but it didn't belong to " + this.connectorConfig);
}
@@ -928,6 +930,8 @@
private void getConnectionWithRetry(final int reconnectAttempts)
{
+ log.info("getConnectionWithRetry::" + reconnectAttempts);
+
long interval = retryInterval;
int count = 0;
@@ -936,7 +940,7 @@
{
while (!exitLoop)
{
- if (log.isDebugEnabled())
+ if (isDebug)
{
log.debug("Trying reconnection attempt " + count);
}
@@ -985,6 +989,7 @@
}
else
{
+ log.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
return;
}
}
@@ -1062,7 +1067,7 @@
{
connector.start();
- if (log.isDebugEnabled())
+ if (isDebug)
{
log.debug("Trying to connect at the main server using connector :" + connectorConfig);
}
@@ -1071,7 +1076,7 @@
if (tc == null)
{
- if (log.isDebugEnabled())
+ if (isDebug)
{
log.debug("Main server is not up. Hopefully there's a backup configured now!");
}
@@ -1090,7 +1095,7 @@
//if connection fails we can try the backup in case it has come live
if(connector == null && backupConfig != null)
{
- if (log.isDebugEnabled())
+ if (isDebug)
{
log.debug("Trying backup config = " + backupConfig);
}
@@ -1109,7 +1114,7 @@
if (tc == null)
{
- if (log.isDebugEnabled())
+ if (isDebug)
{
log.debug("Backup is not active yet");
}
@@ -1128,7 +1133,7 @@
{
/*looks like the backup is now live, lets use that*/
- if (log.isDebugEnabled())
+ if (isDebug)
{
log.debug("Connected to the backup at " + backupConfig);
}
@@ -1145,7 +1150,7 @@
{
if (isTrace)
{
- log.trace("No Backup configured!");
+ log.trace("No Backup configured!", new Exception ("trace"));
}
}
}
@@ -1185,11 +1190,20 @@
if (tc == null)
{
+ if (isDebug)
+ {
+ log.debug("returning connection = " + connection + " as tc == null");
+ }
return connection;
}
connection = new RemotingConnectionImpl(tc, callTimeout, interceptors);
+ if (isDebug)
+ {
+ log.debug("Defined connection " + connection);
+ }
+
connection.addFailureListener(new DelegatingFailureListener(connection.getID()));
Channel channel0 = connection.getChannel(0, -1);
@@ -1219,10 +1233,19 @@
if (serverLocator.isHA())
{
+ if (isDebug)
+ {
+ log.debug("Subscribing Topology");
+ }
+
channel0.send(new SubscribeClusterTopologyUpdatesMessage(serverLocator.isClusterConnection()));
if (serverLocator.isClusterConnection())
{
TransportConfiguration config = serverLocator.getClusterTransportConfiguration();
+ if (isDebug)
+ {
+ log.debug("Announcing node " + serverLocator.getNodeID() + ", isBackup=" + serverLocator.isBackup());
+ }
channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(),
serverLocator.isBackup(),
config));
@@ -1349,7 +1372,7 @@
if (topMessage.isExit())
{
- if (log.isDebugEnabled())
+ if (isDebug)
{
log.debug("Notifying " + topMessage.getNodeID() + " going down");
}
@@ -1357,7 +1380,7 @@
}
else
{
- if (log.isDebugEnabled())
+ if (isDebug)
{
log.debug("Node " + topMessage.getNodeID() + " going up, connector = " + topMessage.getPair() + ", isLast=" + topMessage.isLast());
}
13 years, 6 months
JBoss hornetq SVN: r10819 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-16 20:31:24 -0400 (Thu, 16 Jun 2011)
New Revision: 10819
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
Log:
optimizing logger
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-06-17 00:01:21 UTC (rev 10818)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-06-17 00:31:24 UTC (rev 10819)
@@ -14,8 +14,8 @@
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
import java.util.logging.LogRecord;
/**
@@ -25,18 +25,30 @@
{
private static String LINE_SEPARATOR = System.getProperty("line.separator");
+ private String stripPackage(String clazzName)
+ {
+ return clazzName.substring(clazzName.lastIndexOf(".") + 1);
+ }
+
@Override
public String format(final LogRecord record)
{
- Date date = new Date();
- SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss,SSS");
+ Calendar calendar = GregorianCalendar.getInstance();
+ calendar.setTimeInMillis(record.getMillis());
+
StringBuffer sb = new StringBuffer();
- // Minimize memory allocations here.
- date.setTime(record.getMillis());
+
sb.append("[").append(Thread.currentThread().getName()).append("] ");
- sb.append(dateFormat.format(date)).append(" ");
+ sb.append(calendar.get(GregorianCalendar.HOUR_OF_DAY) + ":" +
+ calendar.get(GregorianCalendar.MINUTE) +
+ ":" +
+ calendar.get(GregorianCalendar.SECOND) +
+ "," +
+ calendar.get(GregorianCalendar.MILLISECOND) +
+ " ");
+
sb.append(record.getLevel()).append(" [");
- sb.append(record.getLoggerName()).append("]").append(" ");
+ sb.append(stripPackage(record.getLoggerName())).append("]").append(" ");
sb.append(record.getMessage());
sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
13 years, 6 months
JBoss hornetq SVN: r10818 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-16 20:01:21 -0400 (Thu, 16 Jun 2011)
New Revision: 10818
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
Leak on connection factories
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-16 15:27:12 UTC (rev 10817)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-17 00:01:21 UTC (rev 10818)
@@ -449,6 +449,8 @@
}
closed = true;
+
+ serverLocator.factoryClosed(this);
}
public void cleanup()
13 years, 6 months
JBoss hornetq SVN: r10817 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-16 11:27:12 -0400 (Thu, 16 Jun 2011)
New Revision: 10817
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
cluster cleanup
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-16 12:01:39 UTC (rev 10816)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-16 15:27:12 UTC (rev 10817)
@@ -185,7 +185,7 @@
started = true;
}
- public synchronized void stop() throws Exception
+ public void stop() throws Exception
{
if (!started)
{
@@ -338,7 +338,7 @@
topology.sendTopology(listener);
}
- public synchronized void removeClusterTopologyListener(final ClusterTopologyListener listener,
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener,
final boolean clusterConnection)
{
if (clusterConnection)
13 years, 6 months
JBoss hornetq SVN: r10816 - trunk/tests/timing-tests.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-16 08:01:39 -0400 (Thu, 16 Jun 2011)
New Revision: 10816
Modified:
trunk/tests/timing-tests/pom.xml
Log:
Fix dependency declaration.
Modified: trunk/tests/timing-tests/pom.xml
===================================================================
--- trunk/tests/timing-tests/pom.xml 2011-06-16 05:25:26 UTC (rev 10815)
+++ trunk/tests/timing-tests/pom.xml 2011-06-16 12:01:39 UTC (rev 10816)
@@ -20,7 +20,6 @@
<version>2.2.3-SNAPSHOT</version>
</parent>
- <groupId>org.hornetq.tests</groupId>
<artifactId>timing-tests</artifactId>
<packaging>jar</packaging>
<name>HornetQ timing Tests</name>
@@ -52,7 +51,6 @@
<artifactId>hornetq-jms</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- <type>test-jar</type>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
13 years, 6 months
JBoss hornetq SVN: r10815 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq: core/client/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-16 01:25:26 -0400 (Thu, 16 Jun 2011)
New Revision: 10815
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
Log:
Some cleanup on bridges
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2011-06-16 05:25:26 UTC (rev 10815)
@@ -133,10 +133,16 @@
int ackBatchSize) throws HornetQException;
void close();
+
+ /**
+ * Opposed to close, will call cleanup only on every created session and children objects.
+ */
+ void cleanup();
ServerLocator getServerLocator();
CoreRemotingConnection getConnection();
boolean isClosed();
+
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-16 05:25:26 UTC (rev 10815)
@@ -129,7 +129,7 @@
private final long maxRetryInterval;
- private final int reconnectAttempts;
+ private int reconnectAttempts;
private final Set<SessionFailureListener> listeners = new ConcurrentHashSet<SessionFailureListener>();
@@ -450,7 +450,45 @@
closed = true;
}
+
+ public void cleanup()
+ {
+ if (closed)
+ {
+ return;
+ }
+ // we need to stop the factory from connecting if it is in the middle of trying to failover before we get the lock
+ causeExit();
+ synchronized (createSessionLock)
+ {
+ synchronized (failoverLock)
+ {
+ HashSet<ClientSessionInternal> sessionsToClose;
+ synchronized (sessions)
+ {
+ sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+ }
+ // work on a copied set. the session will be removed from sessions when session.close() is called
+ for (ClientSessionInternal session : sessionsToClose)
+ {
+ try
+ {
+ session.cleanUp(false);
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to close session", e);
+ }
+ }
+
+ checkCloseConnection();
+ }
+ }
+
+ closed = true;
+ }
+
public boolean isClosed()
{
return closed;
@@ -1447,4 +1485,13 @@
cancelled = true;
}
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.client.impl.ClientSessionFactoryInternal#setReconnectAttempts(int)
+ */
+ public void setReconnectAttempts(int attempts)
+ {
+ this.reconnectAttempts = attempts;
+ }
+
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2011-06-16 05:25:26 UTC (rev 10815)
@@ -46,4 +46,6 @@
void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp);
Object getBackupConnector();
+
+ void setReconnectAttempts(int i);
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-16 05:25:26 UTC (rev 10815)
@@ -1457,7 +1457,7 @@
{
try
{
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ factory.connect(initialConnectAttempts, failoverOnInitialConnection);
}
catch (HornetQException e)
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-16 05:25:26 UTC (rev 10815)
@@ -18,6 +18,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
@@ -28,6 +30,7 @@
import org.hornetq.api.core.client.SendAcknowledgementHandler;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.filter.Filter;
@@ -53,6 +56,7 @@
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author Clebert Suconic
*
* Created 12 Nov 2008 11:37:35
*
@@ -64,13 +68,13 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(BridgeImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
// Attributes ----------------------------------------------------
-
+
private static final SimpleString JMS_QUEUE_ADDRESS_PREFIX = new SimpleString("jms.queue.");
-
+
private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new SimpleString("jms.topic.");
protected final ServerLocatorInternal serverLocator;
@@ -83,6 +87,11 @@
protected final Executor executor;
+ protected final ScheduledExecutorService scheduledExecutor;
+
+ /** Used when there's a scheduled reconnection */
+ protected ScheduledFuture<?> futureScheduledReconnection;
+
private final Filter filter;
private final SimpleString forwardingAddress;
@@ -102,7 +111,7 @@
private final boolean useDuplicateDetection;
private volatile boolean active;
-
+
private volatile boolean stopping;
private final String user;
@@ -111,6 +120,18 @@
private boolean activated;
+ private final int reconnectAttempts;
+
+ private int reconnectAttemptsInUse;
+
+ private final long retryInterval;
+
+ private final double retryMultiplier;
+
+ private final long maxRetryInterval;
+
+ private int retryCount = 0;
+
private NotificationService notificationService;
// Static --------------------------------------------------------
@@ -120,6 +141,10 @@
// Public --------------------------------------------------------
public BridgeImpl(final ServerLocatorInternal serverLocator,
+ final int reconnectAttempts,
+ final long retryInterval,
+ final double retryMultiplier,
+ final long maxRetryInterval,
final UUID nodeUUID,
final SimpleString name,
final Queue queue,
@@ -134,6 +159,17 @@
final boolean activated,
final StorageManager storageManager) throws Exception
{
+
+ this.reconnectAttempts = reconnectAttempts;
+
+ this.reconnectAttemptsInUse = -1;
+
+ this.retryInterval = retryInterval;
+
+ this.retryMultiplier = retryMultiplier;
+
+ this.maxRetryInterval = maxRetryInterval;
+
this.serverLocator = serverLocator;
this.nodeUUID = nodeUUID;
@@ -144,6 +180,8 @@
this.executor = executor;
+ this.scheduledExecutor = scheduledExecutor;
+
filter = FilterImpl.createFilter(filterString);
this.forwardingAddress = forwardingAddress;
@@ -187,7 +225,7 @@
}
}
- private void cancelRefs() throws Exception
+ private void cancelRefs()
{
MessageReference ref;
@@ -201,25 +239,32 @@
}
list.addFirst(ref);
}
-
+
if (isTrace && list.isEmpty())
{
- log.trace("didn't have any references to cancel on bridge " + this);
+ log.trace("didn't have any references to cancel on bridge " + this);
}
- Queue queue = null;
-
+ Queue refqueue = null;
+
long timeBase = System.currentTimeMillis();
for (MessageReference ref2 : list)
{
- queue = ref2.getQueue();
+ refqueue = ref2.getQueue();
- queue.cancel(ref2, timeBase);
+ try
+ {
+ refqueue.cancel(ref2, timeBase);
+ }
+ catch (Exception e)
+ {
+ // There isn't much we can do besides log an error
+ log.error("Couldn't cancel reference " + ref2, e);
+ }
}
+ }
- }
-
public void flushExecutor()
{
// Wait for any create objects runnable to complete
@@ -235,18 +280,22 @@
}
}
-
public void stop() throws Exception
{
- if (log.isDebugEnabled())
- {
- log.debug("Bridge " + this.name + " being stopped");
- }
+ if (log.isDebugEnabled())
+ {
+ log.debug("Bridge " + this.name + " being stopped");
+ }
+
+ stopping = true;
- stopping = true;
+ if (futureScheduledReconnection != null)
+ {
+ futureScheduledReconnection.cancel(true);
+ }
executor.execute(new StopRunnable());
-
+
if (notificationService != null)
{
TypedProperties props = new TypedProperties();
@@ -265,10 +314,10 @@
public void pause() throws Exception
{
- if (log.isDebugEnabled())
- {
- log.debug("Bridge " + this.name + " being paused");
- }
+ if (log.isDebugEnabled())
+ {
+ log.debug("Bridge " + this.name + " being paused");
+ }
executor.execute(new PauseRunnable());
@@ -288,13 +337,13 @@
}
}
- public void resume() throws Exception
- {
- queue.addConsumer(BridgeImpl.this);
- queue.deliverAsync();
- }
+ public void resume() throws Exception
+ {
+ queue.addConsumer(BridgeImpl.this);
+ queue.deliverAsync();
+ }
- public boolean isStarted()
+ public boolean isStarted()
{
return started;
}
@@ -303,7 +352,7 @@
{
activated = true;
- executor.execute(new CreateObjectsRunnable());
+ executor.execute(new ConnectRunnable());
}
public SimpleString getName()
@@ -377,7 +426,7 @@
{
// We keep our own DuplicateID for the Bridge, so bouncing back and forths will work fine
byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
-
+
message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
}
@@ -396,13 +445,13 @@
public static byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID)
{
byte[] bytes = new byte[24];
-
+
ByteBuffer bb = ByteBuffer.wrap(bytes);
-
+
bb.put(nodeUUID.asBytes());
-
+
bb.putLong(messageID);
-
+
return bytes;
}
@@ -421,16 +470,16 @@
return HandleStatus.BUSY;
}
- if (isTrace)
- {
- log.trace("Bridge " + name + " is handling reference=" + ref);
- }
+ if (isTrace)
+ {
+ log.trace("Bridge " + name + " is handling reference=" + ref);
+ }
ref.handled();
ServerMessage message = ref.getMessage();
refs.add(ref);
-
+
message = beforeForward(message);
SimpleString dest;
@@ -444,10 +493,10 @@
// Preserve the original address
dest = message.getAddress();
}
- //if we failover during send then there is a chance that the
- //that this will throw a disconnect, we need to remove the message
- //from the acks so it will get resent, duplicate detection will cope
- //with any messages resent
+ // if we failover during send then there is a chance that the
+ // that this will throw a disconnect, we need to remove the message
+ // from the acks so it will get resent, duplicate detection will cope
+ // with any messages resent
try
{
producer.send(dest, message);
@@ -467,14 +516,34 @@
// FailureListener implementation --------------------------------
- public void connectionFailed(final HornetQException me, boolean failedOver)
+ public final void connectionFailed(final HornetQException me, boolean failedOver)
{
log.warn(name + "::Connection failed with failedOver=" + failedOver, me);
if (isTrace)
{
- log.trace("Calling BridgeImpl::connectionFailed(HOrnetQException me=" + me + ", boolean failedOver=" + failedOver);
+ log.trace("Calling BridgeImpl::connectionFailed(HOrnetQException me=" + me +
+ ", boolean failedOver=" +
+ failedOver);
}
+ try
+ {
+ csf.cleanup();
+ }
+ catch (Throwable dontCare)
+ {
+ }
+
+ try
+ {
+ session.cleanUp(false);
+ }
+ catch (Throwable dontCare)
+ {
+ }
+
fail(false);
+
+ scheduleRetryConnect();
}
public void beforeReconnect(final HornetQException exception)
@@ -482,8 +551,6 @@
log.warn(name + "::Connection failed before reconnect ", exception);
fail(true);
}
-
-
// Package protected ---------------------------------------------
@@ -497,8 +564,8 @@
@Override
public String toString()
{
- return this.getClass().getName() +
- " [name=" + name +
+ return this.getClass().getName() + " [name=" +
+ name +
", nodeUUID=" +
nodeUUID +
", queue=" +
@@ -516,212 +583,199 @@
"]";
}
- private void fail(final boolean beforeReconnect)
+ protected void fail(final boolean permanently)
{
- // This will get called even after the bridge reconnects - in this case
- // we want to cancel all unacked refs so they get resent
- // duplicate detection will ensure no dups are routed on the other side
+ log.debug(name + "::BridgeImpl::fail being called, permanently=" + permanently);
- log.debug(name + "::BridgeImpl::fail being called, beforeReconnect=" + beforeReconnect);
+ if (queue != null)
+ {
+ try
+ {
+ queue.removeConsumer(this);
+ }
+ catch (Exception dontcare)
+ {
+ log.debug(dontcare);
+ }
+ }
- if (session.getConnection().isDestroyed())
+ cancelRefs();
+ if (queue != null)
{
- log.debug(name + "::Connection is destroyed, active = false now");
- active = false;
+ queue.deliverAsync();
}
-
-
- if (!session.getConnection().isDestroyed())
- {
- if (beforeReconnect)
- {
- try {
- log.debug(name + "::Connection is destroyed, active = false now");
-
- cancelRefs();
- }
- catch (Exception e)
- {
- BridgeImpl.log.error("Failed to cancel refs", e);
- }
- }
- else
- {
- try
- {
- afterConnect();
-
- log.debug(name + "::After reconnect, setting active=true now");
- active = true;
-
- if (queue != null)
- {
- queue.deliverAsync();
- }
- }
- catch (Exception e)
- {
- BridgeImpl.log.error("Failed to call after connect", e);
- }
- }
- }
}
/* Hook for doing extra stuff after connection */
protected void afterConnect() throws Exception
{
- //NOOP
+ retryCount = 0;
+ reconnectAttemptsInUse = reconnectAttempts;
}
/* Hook for creating session factory */
- protected ClientSessionFactory createSessionFactory() throws Exception
+ protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
- return serverLocator.createSessionFactory();
+ ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal)serverLocator.createSessionFactory();
+ // csf.setReconnectAttempts(0);
+ //csf.setInitialReconnectAttempts(1);
+ return csf;
}
/* This is called only when the bridge is activated */
- protected synchronized boolean createObjects()
+ protected void connect()
{
- if (!started)
- {
- return false;
- }
+ BridgeImpl.log.info("Connecting bridge " + name + " to its destination [" + nodeUUID.toString() + "], csf=" + this.csf);
- boolean retry = false;
- int retryCount = 0;
+ retryCount++;
- do
+ try
{
- BridgeImpl.log.info("Connecting bridge " + name + " to its destination [" + nodeUUID.toString() + "]");
+ if (csf == null || csf.isClosed())
+ {
+ csf = createSessionFactory();
+ // Session is pre-acknowledge
+ session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
+ try
+ {
+ session.addMetaData("Session-for-bridge", name.toString());
+ session.addMetaData("nodeUUID", nodeUUID.toString());
+ }
+ catch (Throwable dontCare)
+ {
+ // addMetaData here is just for debug purposes
+ }
+ }
- try
+ if (forwardingAddress != null)
{
- if (csf == null || csf.isClosed())
+ BindingQuery query = null;
+
+ try
{
- csf = createSessionFactory();
- // Session is pre-acknowledge
- session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
- try
- {
- session.addMetaData("Session-for-bridge", name.toString());
- session.addMetaData("nodeUUID", nodeUUID.toString());
- }
- catch (Throwable dontCare)
- {
- // addMetaData here is just for debug purposes
- }
+ query = session.bindingQuery(forwardingAddress);
}
+ catch (Throwable e)
+ {
+ log.warn("Error on querying binding on bridge " + this.name + ". Retrying in 100 milliseconds", e);
+ // This was an issue during startup, we will not count this retry
+ retryCount--;
- if (forwardingAddress != null)
+ scheduleRetryConnectFixedTimeout(100);
+ return;
+ }
+
+ if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) || forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX))
{
- BindingQuery query = null;
-
- try
+ if (!query.isExists())
{
- query = session.bindingQuery(forwardingAddress);
+ log.warn("Address " + forwardingAddress +
+ " doesn't have any bindings yet, retry #(" +
+ retryCount +
+ ")");
+ scheduleRetryConnect();
+ return;
}
- catch (Throwable e)
+ }
+ else
+ {
+ if (!query.isExists())
{
- log.warn("Error on querying binding. Retrying", e);
- retry = true;
- Thread.sleep(100);
- continue;
+ log.info("Bridge " + this.getName() +
+ " connected to fowardingAddress=" +
+ this.getForwardingAddress() +
+ ". " +
+ getForwardingAddress() +
+ " doesn't have any bindings what means messages will be ignored until a binding is created.");
}
-
- if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) || forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX))
- {
- if (!query.isExists())
- {
- retryCount ++;
- if (serverLocator.getReconnectAttempts() > 0)
- {
- if (retryCount > serverLocator.getReconnectAttempts())
- {
- log.warn("Retried " + forwardingAddress + " up to the configured reconnectAttempts(" + serverLocator.getReconnectAttempts() + "). Giving up now. The bridge " + this.getName() + " will not be activated");
- return false;
- }
- }
-
- log.warn("Address " + forwardingAddress + " doesn't have any bindings yet, retry #(" + retryCount + ")");
- Thread.sleep(serverLocator.getRetryInterval());
- retry = true;
- csf.close();
- session.close();
- continue;
- }
- }
- else
- {
- if (!query.isExists())
- {
- log.info("Bridge " + this.getName() + " connected to fowardingAddress=" + this.getForwardingAddress() + ". " + getForwardingAddress() + " doesn't have any bindings what means messages will be ignored until a binding is created.");
- }
- }
}
+ }
- if (session == null)
- {
- // This can happen if the bridge is shutdown
- return false;
- }
+ producer = session.createProducer();
+ session.addFailureListener(BridgeImpl.this);
+ session.setSendAcknowledgementHandler(BridgeImpl.this);
- producer = session.createProducer();
- session.addFailureListener(BridgeImpl.this);
- session.setSendAcknowledgementHandler(BridgeImpl.this);
+ afterConnect();
- afterConnect();
+ active = true;
- active = true;
+ queue.addConsumer(BridgeImpl.this);
+ queue.deliverAsync();
- queue.addConsumer(BridgeImpl.this);
- queue.deliverAsync();
+ BridgeImpl.log.info("Bridge " + name + " is connected [" + nodeUUID + "-> " + name + "]");
- BridgeImpl.log.info("Bridge " + name + " is connected [" + nodeUUID + "-> " + name +"]");
+ return;
+ }
+ catch (HornetQException e)
+ {
+ // the session was created while its server was starting, retry it:
+ if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+ {
+ BridgeImpl.log.warn("Server is starting, retry to create the session for bridge " + name);
- return true;
+ // We are not going to count this one as a retry
+ retryCount--;
+ scheduleRetryConnectFixedTimeout(100);
+ return;
}
- catch (HornetQException e)
+ else
{
- if (csf != null)
- {
- csf.close();
- }
+ BridgeImpl.log.warn("Bridge " + name + " is unable to connect to destination. Retrying", e);
+ }
+ }
+ catch (Exception e)
+ {
+ BridgeImpl.log.warn("Bridge " + name + " is unable to connect to destination. It will be disabled.", e);
+ }
- // the session was created while its server was starting, retry it:
- if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
- {
- BridgeImpl.log.warn("Server is starting, retry to create the session for bridge " + name);
+ scheduleRetryConnect();
- // Sleep a little to prevent spinning too much
- try
- {
- Thread.sleep(10);
- }
- catch (InterruptedException ignore)
- {
- }
+ }
- retry = true;
+ protected void scheduleRetryConnect()
+ {
+ if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttempts)
+ {
+ log.warn("Bridge " + this.name +
+ " achieved " +
+ retryCount +
+ " maxattempts=" +
+ reconnectAttempts +
+ " it will stop retrying to reconnect");
+ fail(true);
+ return;
+ }
- continue;
- }
- else
- {
- BridgeImpl.log.warn("Bridge " + name + " is unable to connect to destination. It will be disabled.", e);
+ long timeout = (long)(this.retryCount * this.retryMultiplier * this.retryMultiplier);
+ if (timeout == 0)
+ {
+ timeout = this.retryInterval;
+ }
+ if (timeout > maxRetryInterval)
+ {
+ timeout = maxRetryInterval;
+ }
- return false;
- }
+ scheduleRetryConnectFixedTimeout(timeout);
+ }
+
+ protected void scheduleRetryConnectFixedTimeout(final long milliseconds)
+ {
+ if (csf != null)
+ {
+ try
+ {
+ csf.cleanup();
}
- catch (Exception e)
+ catch (Throwable ignored)
{
- BridgeImpl.log.warn("Bridge " + name + " is unable to connect to destination. It will be disabled.", e);
-
- return false;
}
}
- while (retry && !stopping);
- return false;
+ csf = null;
+ session = null;
+
+ futureScheduledReconnection = scheduledExecutor.schedule(new FutureConnectRunnable(), milliseconds, TimeUnit.MILLISECONDS);
}
// Inner classes -------------------------------------------------
@@ -732,14 +786,6 @@
{
try
{
- // We need to close the session outside of the lock,
- // so any pending operation will be canceled right away
-
- // TODO: Why closing the CSF will make a few clustering and failover tests to
- // either deadlock or take forever on waiting
- // locks
- csf.close();
- csf = null;
if (session != null)
{
log.debug("Cleaning up session " + session);
@@ -747,6 +793,11 @@
session.removeFailureListener(BridgeImpl.this);
}
+ if (csf != null)
+ {
+ csf.close();
+ }
+
synchronized (BridgeImpl.this)
{
log.debug("Closing Session for bridge " + BridgeImpl.this.name);
@@ -759,13 +810,6 @@
queue.removeConsumer(BridgeImpl.this);
- cancelRefs();
-
- if (queue != null)
- {
- queue.deliverAsync();
- }
-
log.info("stopped bridge " + name);
}
catch (Exception e)
@@ -783,23 +827,15 @@
{
synchronized (BridgeImpl.this)
{
- log.debug("Closing Session for bridge " + BridgeImpl.this.name);
-
started = false;
active = false;
-
}
queue.removeConsumer(BridgeImpl.this);
- cancelRefs();
+ internalCancelReferences();
- if (queue != null)
- {
- queue.deliverAsync();
- }
-
log.info("paused bridge " + name);
}
catch (Exception e)
@@ -807,18 +843,33 @@
BridgeImpl.log.error("Failed to pause bridge", e);
}
}
+
}
- private class CreateObjectsRunnable implements Runnable
+ private void internalCancelReferences()
{
- public synchronized void run()
+ cancelRefs();
+
+ if (queue != null)
{
- if (!createObjects())
- {
- active = false;
+ queue.deliverAsync();
+ }
+ }
- started = false;
- }
+ // The scheduling will still use the main executor here
+ private class FutureConnectRunnable implements Runnable
+ {
+ public void run()
+ {
+ executor.execute(new ConnectRunnable());
}
}
+
+ private class ConnectRunnable implements Runnable
+ {
+ public synchronized void run()
+ {
+ connect();
+ }
+ }
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-16 05:25:26 UTC (rev 10815)
@@ -44,6 +44,7 @@
* A ClusterConnectionBridge
*
* @author tim
+ * @author Clebert Suconic
*
*
*/
@@ -68,6 +69,10 @@
private final String targetNodeID;
public ClusterConnectionBridge(final ServerLocatorInternal serverLocator,
+ final int reconnectAttempts,
+ final long retryInterval,
+ final double retryMultiplier,
+ final long maxRetryInterval,
final UUID nodeUUID,
final String targetNodeID,
final SimpleString name,
@@ -88,6 +93,10 @@
final TransportConfiguration connector) throws Exception
{
super(serverLocator,
+ reconnectAttempts,
+ retryInterval,
+ retryMultiplier,
+ maxRetryInterval,
nodeUUID,
name,
queue,
@@ -231,34 +240,15 @@
super.stop();
}
- @Override
- protected ClientSessionFactory createSessionFactory() throws Exception
+ protected void failed(final boolean permanently)
{
- //We create the session factory using the specified connector
+ super.fail(permanently);
- return serverLocator.createSessionFactory(connector);
- }
-
- @Override
- public void connectionFailed(HornetQException me, boolean failedOver)
- {
- if (isTrace)
- {
- log.trace("Connection Failed on ClusterConnectionBridge, failedOver = " + failedOver + ", sessionClosed = " + session.isClosed(), new Exception ("trace"));
- }
-
- if (!failedOver && !session.isClosed())
+ if (permanently)
{
- try
- {
- session.cleanUp(true);
- }
- catch (Exception e)
- {
- log.warn("Unable to clean up the session after a connection failure", e);
- }
+ log.debug("cluster node for bridge " + this.getName() + " is permanently down");
serverLocator.notifyNodeDown(targetNodeID);
}
- super.connectionFailed(me, failedOver);
+
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-16 05:25:26 UTC (rev 10815)
@@ -397,12 +397,17 @@
{
serverLocator.setNodeID(nodeUUID.toString());
- serverLocator.setReconnectAttempts(reconnectAttempts);
+ serverLocator.setReconnectAttempts(0);
serverLocator.setClusterConnection(true);
serverLocator.setClusterTransportConfiguration(connector);
serverLocator.setBackup(server.getConfiguration().isBackup());
serverLocator.setInitialConnectAttempts(-1);
+// serverLocator.setInitialConnectAttempts(1);
+
+ serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+ serverLocator.setConnectionTTL(connectionTTL);
+
if (serverLocator.getConfirmationWindowSize() < 0)
{
// We can't have confirmationSize = -1 on the cluster Bridge
@@ -466,7 +471,7 @@
{
log.trace("Closing clustering record " + record);
}
- record.pause();
+ record.close();
}
catch (Exception e)
{
@@ -481,6 +486,10 @@
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("node " + nodeID + " connectionPair = " + connectorPair + " is up");
+ }
// discard notifications about ourselves unless its from our backup
if (nodeID.equals(nodeUUID.toString()))
@@ -677,24 +686,28 @@
protected Bridge createBridge(MessageFlowRecordImpl record) throws Exception
{
ClusterConnectionBridge bridge = new ClusterConnectionBridge(serverLocator,
- nodeUUID,
- record.getNodeID(),
- record.getQueueName(),
- record.getQueue(),
- executorFactory.getExecutor(),
- null,
- null,
- scheduledExecutor,
- null,
- useDuplicateDetection,
- clusterUser,
- clusterPassword,
- !backup,
- server.getStorageManager(),
- managementService.getManagementAddress(),
- managementService.getManagementNotificationAddress(),
- record,
- record.getConnector());
+ reconnectAttempts,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ nodeUUID,
+ record.getNodeID(),
+ record.getQueueName(),
+ record.getQueue(),
+ executorFactory.getExecutor(),
+ null,
+ null,
+ scheduledExecutor,
+ null,
+ useDuplicateDetection,
+ clusterUser,
+ clusterPassword,
+ !backup,
+ server.getStorageManager(),
+ managementService.getManagementAddress(),
+ managementService.getManagementNotificationAddress(),
+ record,
+ record.getConnector());
return bridge;
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-16 05:25:26 UTC (rev 10815)
@@ -687,12 +687,14 @@
}
serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());
- serverLocator.setReconnectAttempts(config.getReconnectAttempts());
- serverLocator.setRetryInterval(config.getRetryInterval());
- serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
- serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
+
+ // We are going to manually retry on the bridge in case of failure
+ serverLocator.setReconnectAttempts(0);
+ serverLocator.setInitialConnectAttempts(1);
+
+
+
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
- serverLocator.setInitialConnectAttempts(config.getReconnectAttempts());
serverLocator.setBlockOnDurableSend(!config.isUseDuplicateDetection());
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
if (!config.isUseDuplicateDetection())
@@ -702,6 +704,10 @@
}
clusterLocators.add(serverLocator);
Bridge bridge = new BridgeImpl(serverLocator,
+ config.getReconnectAttempts(),
+ config.getRetryInterval(),
+ config.getRetryIntervalMultiplier(),
+ config.getMaxRetryInterval(),
nodeUUID,
new SimpleString(config.getName()),
queue,
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-16 05:23:43 UTC (rev 10814)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-16 05:25:26 UTC (rev 10815)
@@ -641,7 +641,7 @@
{
log.trace("Sending Notification = " + notification +
", notificationEnabled=" + notificationsEnabled +
- " messagingServerControl=" + messagingServerControl, new Exception ("trace"));
+ " messagingServerControl=" + messagingServerControl);
}
if (messagingServerControl != null && notificationsEnabled)
{
13 years, 6 months
JBoss hornetq SVN: r10814 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-16 01:23:43 -0400 (Thu, 16 Jun 2011)
New Revision: 10814
Added:
branches/Branch_2_2_EAP_cluster_clean2/
Log:
cluster cleanup
13 years, 6 months
JBoss hornetq SVN: r10813 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-15 13:40:45 -0400 (Wed, 15 Jun 2011)
New Revision: 10813
Removed:
branches/Branch_2_2_EAP-cluster-cleanup/
Log:
deleting temporary branch
13 years, 6 months
JBoss hornetq SVN: r10812 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-15 12:52:17 -0400 (Wed, 15 Jun 2011)
New Revision: 10812
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
fixing test
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-06-15 15:15:58 UTC (rev 10811)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-06-15 16:52:17 UTC (rev 10812)
@@ -847,7 +847,10 @@
return;
}
- log.debug("Calling close on session " + this);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Calling close on session " + this);
+ }
try
{
@@ -1095,13 +1098,19 @@
public void addMetaDataV1(String key, String data) throws HornetQException
{
- metadata.put(key, data);
+ synchronized (metadata)
+ {
+ metadata.put(key, data);
+ }
channel.sendBlocking(new SessionAddMetaDataMessage(key, data));
}
public void addMetaData(String key, String data) throws HornetQException
{
- metadata.put(key, data);
+ synchronized (metadata)
+ {
+ metadata.put(key, data);
+ }
channel.sendBlocking(new SessionAddMetaDataMessageV2(key, data));
}
@@ -1617,12 +1626,23 @@
public String toString()
{
StringBuffer buffer = new StringBuffer();
- for (Map.Entry<String, String> entry : metadata.entrySet())
+ synchronized (metadata)
{
- buffer.append(entry.getKey() + "=" + entry.getValue() + ",");
+ for (Map.Entry<String, String> entry : metadata.entrySet())
+ {
+ buffer.append(entry.getKey() + "=" + entry.getValue() + ",");
+ }
}
-
- return "ClientSessionImpl [name=" + name + ", username=" + username + ", closed=" + closed + " metaData=(" + buffer + ")]@" + Integer.toHexString(hashCode()) ;
+
+ return "ClientSessionImpl [name=" + name +
+ ", username=" +
+ username +
+ ", closed=" +
+ closed +
+ " metaData=(" +
+ buffer +
+ ")]@" +
+ Integer.toHexString(hashCode());
}
// Protected
13 years, 6 months