Author: clebert.suconic(a)jboss.com
Date: 2011-06-21 11:34:44 -0400 (Tue, 21 Jun 2011)
New Revision: 10866
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
Log:
tweaks
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -83,9 +83,9 @@
private static final long serialVersionUID = 2512460695662741413L;
private static final Logger log = Logger.getLogger(ClientSessionFactoryImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
-
+
private static final boolean isDebug = log.isDebugEnabled();
// Attributes
@@ -206,7 +206,7 @@
closeExecutor = orderedExecutorFactory.getExecutor();
this.interceptors = interceptors;
-
+
}
public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection)
throws HornetQException
@@ -217,12 +217,11 @@
if (connection == null)
{
StringBuffer msg = new StringBuffer("Unable to connect to server using
configuration ").append(connectorConfig);
- if(backupConfig != null)
+ if (backupConfig != null)
{
msg.append(" and backup configuration ").append(backupConfig);
}
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- msg.toString());
+ throw new HornetQException(HornetQException.NOT_CONNECTED, msg.toString());
}
}
@@ -234,11 +233,11 @@
public void setBackupConnector(TransportConfiguration live, TransportConfiguration
backUp)
{
- if(live.equals(connectorConfig) && backUp != null)
+ if (live.equals(connectorConfig) && backUp != null)
{
if (isDebug)
{
- log.debug("Setting up backup config = " + backUp + " for
live = " + live);
+ log.debug("Setting up backup config = " + backUp + " for live
= " + live);
}
backupConfig = backUp;
}
@@ -246,7 +245,11 @@
{
if (isDebug)
{
- log.debug("ClientSessionFactoryImpl received backup update for
live/backup pair = " + live + " / " + backUp + " but it didn't
belong to " + this.connectorConfig);
+ log.debug("ClientSessionFactoryImpl received backup update for
live/backup pair = " + live +
+ " / " +
+ backUp +
+ " but it didn't belong to " +
+ this.connectorConfig);
}
}
}
@@ -381,7 +384,7 @@
sessions.remove(session);
}
}
-
+
public void connectionReadyForWrites(final Object connectionID, final boolean ready)
{
}
@@ -458,10 +461,10 @@
}
closed = true;
-
+
serverLocator.factoryClosed(this);
}
-
+
public void cleanup()
{
if (closed)
@@ -473,39 +476,36 @@
causeExit();
synchronized (createSessionLock)
{
- synchronized (failoverLock)
+ HashSet<ClientSessionInternal> sessionsToClose;
+ synchronized (sessions)
{
- HashSet<ClientSessionInternal> sessionsToClose;
- synchronized (sessions)
+ sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+ }
+ // work on a copied set. the session will be removed from sessions when
session.close() is called
+ for (ClientSessionInternal session : sessionsToClose)
+ {
+ try
{
- sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+ session.cleanUp(false);
}
- // work on a copied set. the session will be removed from sessions when
session.close() is called
- for (ClientSessionInternal session : sessionsToClose)
+ catch (Exception e)
{
- try
- {
- session.cleanUp(false);
- }
- catch (Exception e)
- {
- log.warn("Unable to close session", e);
- }
+ log.warn("Unable to close session", e);
}
+ }
- checkCloseConnection();
- }
+ checkCloseConnection();
}
closed = true;
}
- public boolean isClosed()
- {
- return closed;
- }
+ public boolean isClosed()
+ {
+ return closed;
+ }
- public ServerLocator getServerLocator()
+ public ServerLocator getServerLocator()
{
return serverLocator;
}
@@ -517,7 +517,7 @@
{
stopPingingAfterOne = true;
}
-
+
public void resumePinging()
{
stopPingingAfterOne = false;
@@ -553,12 +553,11 @@
return;
}
-
if (isTrace)
{
log.trace("Client Connection failed, calling failure listeners and
trying to reconnect, reconnectAttempts=" + reconnectAttempts);
}
-
+
// We call before reconnection occurs to give the user a chance to do cleanup,
like cancel messages
callFailureListeners(me, false, false);
@@ -587,7 +586,6 @@
// It can then release the channel 1 lock, and retry (which will cause locking
on failoverLock
// until failover is complete
-
if (reconnectAttempts != 0)
{
lockChannel1();
@@ -928,7 +926,7 @@
{
sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
}
-
+
for (ClientSessionInternal session : sessionsToFailover)
{
session.handleFailover(connection);
@@ -938,7 +936,7 @@
private void getConnectionWithRetry(final int reconnectAttempts)
{
log.info("getConnectionWithRetry::" + reconnectAttempts);
-
+
long interval = retryInterval;
int count = 0;
@@ -961,7 +959,7 @@
if (reconnectAttempts != 0)
{
count++;
-
+
if (reconnectAttempts != -1 && count == reconnectAttempts)
{
log.warn("Tried " + reconnectAttempts + " times to
connect. Now giving up on reconnecting it.");
@@ -971,19 +969,21 @@
if (isTrace)
{
- log.trace("Waiting " + interval +
- " milliseconds before next retry.
RetryInterval=" + retryInterval +
- " and multiplier = " +
retryIntervalMultiplier);
+ log.trace("Waiting " + interval +
+ " milliseconds before next retry.
RetryInterval=" +
+ retryInterval +
+ " and multiplier = " +
+ retryIntervalMultiplier);
}
-
+
try
{
- waitLock.wait(interval);
+ waitLock.wait(interval);
}
catch (InterruptedException ignore)
{
}
-
+
// Exponential back-off
long newInterval = (long)(interval * retryIntervalMultiplier);
@@ -1062,10 +1062,12 @@
try
{
DelegatingBufferHandler handler = new DelegatingBufferHandler();
-
+
if (log.isDebugEnabled())
{
- log.debug("Trying to connect with connector = " +
connectorFactory + ", parameters = " + connectorConfig.getParams());
+ log.debug("Trying to connect with connector = " +
connectorFactory +
+ ", parameters = " +
+ connectorConfig.getParams());
}
connector = connectorFactory.createConnector(connectorConfig.getParams(),
@@ -1083,7 +1085,7 @@
{
log.debug("Trying to connect at the main server using connector
:" + connectorConfig);
}
-
+
tc = connector.createConnection();
if (tc == null)
@@ -1092,7 +1094,7 @@
{
log.debug("Main server is not up. Hopefully there's a
backup configured now!");
}
-
+
try
{
connector.close();
@@ -1104,8 +1106,8 @@
connector = null;
}
}
- //if connection fails we can try the backup in case it has come live
- if(connector == null && backupConfig != null)
+ // if connection fails we can try the backup in case it has come live
+ if (connector == null && backupConfig != null)
{
if (isDebug)
{
@@ -1113,11 +1115,11 @@
}
ConnectorFactory backupConnectorFactory =
instantiateConnectorFactory(backupConfig.getFactoryClassName());
connector =
backupConnectorFactory.createConnector(backupConfig.getParams(),
- handler,
- this,
- closeExecutor,
- threadPool,
- scheduledThreadPool);
+ handler,
+ this,
+ closeExecutor,
+ threadPool,
+ scheduledThreadPool);
if (connector != null)
{
connector.start();
@@ -1130,7 +1132,7 @@
{
log.debug("Backup is not active yet");
}
-
+
try
{
connector.close();
@@ -1144,12 +1146,12 @@
else
{
/*looks like the backup is now live, lets use that*/
-
+
if (isDebug)
{
log.debug("Connected to the backup at " +
backupConfig);
}
-
+
connectorConfig = backupConfig;
backupConfig = null;
@@ -1162,7 +1164,7 @@
{
if (isTrace)
{
- log.trace("No Backup configured!", new Exception
("trace"));
+ log.trace("No Backup configured!", new
Exception("trace"));
}
}
}
@@ -1215,7 +1217,7 @@
{
log.debug("Defined connection " + connection);
}
-
+
connection.addFailureListener(new
DelegatingFailureListener(connection.getID()));
Channel channel0 = connection.getChannel(0, -1);
@@ -1249,7 +1251,7 @@
{
log.debug("Subscribing Topology");
}
-
+
channel0.send(new
SubscribeClusterTopologyUpdatesMessage(serverLocator.isClusterConnection()));
if (serverLocator.isClusterConnection())
{
@@ -1258,9 +1260,7 @@
{
log.debug("Announcing node " + serverLocator.getNodeID() +
", isBackup=" + serverLocator.isBackup());
}
- channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(),
- serverLocator.isBackup(),
- config));
+ channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(),
serverLocator.isBackup(), config));
}
}
}
@@ -1324,7 +1324,7 @@
if (connection != null)
{
Channel channel1 = connection.getChannel(1, -1);
-
+
if (channel1 != null)
{
channel1.returnBlocking();
@@ -1365,7 +1365,7 @@
if (type == PacketImpl.DISCONNECT)
{
final DisconnectMessage msg = (DisconnectMessage)packet;
-
+
closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for
a long time and fail can
@@ -1400,11 +1400,13 @@
{
if (isDebug)
{
- log.debug("Node " + topMessage.getNodeID() + " going up,
connector = " + topMessage.getPair() + ", isLast=" + topMessage.isLast());
+ log.debug("Node " + topMessage.getNodeID() +
+ " going up, connector = " +
+ topMessage.getPair() +
+ ", isLast=" +
+ topMessage.isLast());
}
- serverLocator.notifyNodeUp(topMessage.getNodeID(),
- topMessage.getPair(),
- topMessage.isLast());
+ serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(),
topMessage.isLast());
}
}
}
@@ -1477,8 +1479,8 @@
first = false;
long now = System.currentTimeMillis();
-
- if (clientFailureCheckPeriod != -1 && connectionTTL != -1 && now
>= lastCheck + connectionTTL )
+
+ if (clientFailureCheckPeriod != -1 && connectionTTL != -1 && now
>= lastCheck + connectionTTL)
{
if (!connection.checkDataReceived())
{
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -18,8 +18,19 @@
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.HornetQException;
@@ -665,258 +676,258 @@
}
}
- public synchronized boolean isHA()
+ public boolean isHA()
{
return ha;
}
- public synchronized boolean isCacheLargeMessagesClient()
+ public boolean isCacheLargeMessagesClient()
{
return cacheLargeMessagesClient;
}
- public synchronized void setCacheLargeMessagesClient(final boolean cached)
+ public void setCacheLargeMessagesClient(final boolean cached)
{
cacheLargeMessagesClient = cached;
}
- public synchronized long getClientFailureCheckPeriod()
+ public long getClientFailureCheckPeriod()
{
return clientFailureCheckPeriod;
}
- public synchronized void setClientFailureCheckPeriod(final long
clientFailureCheckPeriod)
+ public void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
{
checkWrite();
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
}
- public synchronized long getConnectionTTL()
+ public long getConnectionTTL()
{
return connectionTTL;
}
- public synchronized void setConnectionTTL(final long connectionTTL)
+ public void setConnectionTTL(final long connectionTTL)
{
checkWrite();
this.connectionTTL = connectionTTL;
}
- public synchronized long getCallTimeout()
+ public long getCallTimeout()
{
return callTimeout;
}
- public synchronized void setCallTimeout(final long callTimeout)
+ public void setCallTimeout(final long callTimeout)
{
checkWrite();
this.callTimeout = callTimeout;
}
- public synchronized int getMinLargeMessageSize()
+ public int getMinLargeMessageSize()
{
return minLargeMessageSize;
}
- public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
+ public void setMinLargeMessageSize(final int minLargeMessageSize)
{
checkWrite();
this.minLargeMessageSize = minLargeMessageSize;
}
- public synchronized int getConsumerWindowSize()
+ public int getConsumerWindowSize()
{
return consumerWindowSize;
}
- public synchronized void setConsumerWindowSize(final int consumerWindowSize)
+ public void setConsumerWindowSize(final int consumerWindowSize)
{
checkWrite();
this.consumerWindowSize = consumerWindowSize;
}
- public synchronized int getConsumerMaxRate()
+ public int getConsumerMaxRate()
{
return consumerMaxRate;
}
- public synchronized void setConsumerMaxRate(final int consumerMaxRate)
+ public void setConsumerMaxRate(final int consumerMaxRate)
{
checkWrite();
this.consumerMaxRate = consumerMaxRate;
}
- public synchronized int getConfirmationWindowSize()
+ public int getConfirmationWindowSize()
{
return confirmationWindowSize;
}
- public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
+ public void setConfirmationWindowSize(final int confirmationWindowSize)
{
checkWrite();
this.confirmationWindowSize = confirmationWindowSize;
}
- public synchronized int getProducerWindowSize()
+ public int getProducerWindowSize()
{
return producerWindowSize;
}
- public synchronized void setProducerWindowSize(final int producerWindowSize)
+ public void setProducerWindowSize(final int producerWindowSize)
{
checkWrite();
this.producerWindowSize = producerWindowSize;
}
- public synchronized int getProducerMaxRate()
+ public int getProducerMaxRate()
{
return producerMaxRate;
}
- public synchronized void setProducerMaxRate(final int producerMaxRate)
+ public void setProducerMaxRate(final int producerMaxRate)
{
checkWrite();
this.producerMaxRate = producerMaxRate;
}
- public synchronized boolean isBlockOnAcknowledge()
+ public boolean isBlockOnAcknowledge()
{
return blockOnAcknowledge;
}
- public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+ public void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
{
checkWrite();
this.blockOnAcknowledge = blockOnAcknowledge;
}
- public synchronized boolean isBlockOnDurableSend()
+ public boolean isBlockOnDurableSend()
{
return blockOnDurableSend;
}
- public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
+ public void setBlockOnDurableSend(final boolean blockOnDurableSend)
{
checkWrite();
this.blockOnDurableSend = blockOnDurableSend;
}
- public synchronized boolean isBlockOnNonDurableSend()
+ public boolean isBlockOnNonDurableSend()
{
return blockOnNonDurableSend;
}
- public synchronized void setBlockOnNonDurableSend(final boolean
blockOnNonDurableSend)
+ public void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
{
checkWrite();
this.blockOnNonDurableSend = blockOnNonDurableSend;
}
- public synchronized boolean isAutoGroup()
+ public boolean isAutoGroup()
{
return autoGroup;
}
- public synchronized void setAutoGroup(final boolean autoGroup)
+ public void setAutoGroup(final boolean autoGroup)
{
checkWrite();
this.autoGroup = autoGroup;
}
- public synchronized boolean isPreAcknowledge()
+ public boolean isPreAcknowledge()
{
return preAcknowledge;
}
- public synchronized void setPreAcknowledge(final boolean preAcknowledge)
+ public void setPreAcknowledge(final boolean preAcknowledge)
{
checkWrite();
this.preAcknowledge = preAcknowledge;
}
- public synchronized int getAckBatchSize()
+ public int getAckBatchSize()
{
return ackBatchSize;
}
- public synchronized void setAckBatchSize(final int ackBatchSize)
+ public void setAckBatchSize(final int ackBatchSize)
{
checkWrite();
this.ackBatchSize = ackBatchSize;
}
- public synchronized boolean isUseGlobalPools()
+ public boolean isUseGlobalPools()
{
return useGlobalPools;
}
- public synchronized void setUseGlobalPools(final boolean useGlobalPools)
+ public void setUseGlobalPools(final boolean useGlobalPools)
{
checkWrite();
this.useGlobalPools = useGlobalPools;
}
- public synchronized int getScheduledThreadPoolMaxSize()
+ public int getScheduledThreadPoolMaxSize()
{
return scheduledThreadPoolMaxSize;
}
- public synchronized void setScheduledThreadPoolMaxSize(final int
scheduledThreadPoolMaxSize)
+ public void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
{
checkWrite();
this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
}
- public synchronized int getThreadPoolMaxSize()
+ public int getThreadPoolMaxSize()
{
return threadPoolMaxSize;
}
- public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
+ public void setThreadPoolMaxSize(final int threadPoolMaxSize)
{
checkWrite();
this.threadPoolMaxSize = threadPoolMaxSize;
}
- public synchronized long getRetryInterval()
+ public long getRetryInterval()
{
return retryInterval;
}
- public synchronized void setRetryInterval(final long retryInterval)
+ public void setRetryInterval(final long retryInterval)
{
checkWrite();
this.retryInterval = retryInterval;
}
- public synchronized long getMaxRetryInterval()
+ public long getMaxRetryInterval()
{
return maxRetryInterval;
}
- public synchronized void setMaxRetryInterval(final long retryInterval)
+ public void setMaxRetryInterval(final long retryInterval)
{
checkWrite();
maxRetryInterval = retryInterval;
}
- public synchronized double getRetryIntervalMultiplier()
+ public double getRetryIntervalMultiplier()
{
return retryIntervalMultiplier;
}
- public synchronized void setRetryIntervalMultiplier(final double
retryIntervalMultiplier)
+ public void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
{
checkWrite();
this.retryIntervalMultiplier = retryIntervalMultiplier;
}
- public synchronized int getReconnectAttempts()
+ public int getReconnectAttempts()
{
return reconnectAttempts;
}
- public synchronized void setReconnectAttempts(final int reconnectAttempts)
+ public void setReconnectAttempts(final int reconnectAttempts)
{
checkWrite();
this.reconnectAttempts = reconnectAttempts;
@@ -933,23 +944,23 @@
return initialConnectAttempts;
}
- public synchronized boolean isFailoverOnInitialConnection()
+ public boolean isFailoverOnInitialConnection()
{
return this.failoverOnInitialConnection;
}
- public synchronized void setFailoverOnInitialConnection(final boolean failover)
+ public void setFailoverOnInitialConnection(final boolean failover)
{
checkWrite();
this.failoverOnInitialConnection = failover;
}
- public synchronized String getConnectionLoadBalancingPolicyClassName()
+ public String getConnectionLoadBalancingPolicyClassName()
{
return connectionLoadBalancingPolicyClassName;
}
- public synchronized void setConnectionLoadBalancingPolicyClassName(final String
loadBalancingPolicyClassName)
+ public void setConnectionLoadBalancingPolicyClassName(final String
loadBalancingPolicyClassName)
{
checkWrite();
connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
@@ -975,12 +986,12 @@
return interceptors.remove(interceptor);
}
- public synchronized int getInitialMessagePacketSize()
+ public int getInitialMessagePacketSize()
{
return initialMessagePacketSize;
}
- public synchronized void setInitialMessagePacketSize(final int size)
+ public void setInitialMessagePacketSize(final int size)
{
checkWrite();
initialMessagePacketSize = size;
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -213,7 +213,7 @@
{
if (isTrace)
{
- log.trace("Receiving notification : " + notification);
+ log.trace("Receiving notification : " + notification + " on
server " + this.server);
}
synchronized (notificationLock)
{
@@ -471,7 +471,10 @@
String uid = UUIDGenerator.getInstance().generateStringUUID();
- System.out.println("Seding notification for addBinding " + binding);
+ if (isTrace)
+ {
+ log.trace("Seding notification for addBinding " + binding + "
from server " + server);
+ }
managementService.sendNotification(new Notification(uid,
NotificationType.BINDING_ADDED, props));
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -60,6 +60,8 @@
String getIdentity();
+ String describe();
+
Configuration getConfiguration();
RemotingService getRemotingService();
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -15,11 +15,11 @@
import java.util.Map;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.HornetQServer;
/**
* A ClusterConnection
@@ -35,6 +35,8 @@
SimpleString getName();
String getNodeID();
+
+ HornetQServer getServer();
/**
* @return a Map of node ID and addresses
@@ -46,7 +48,5 @@
TransportConfiguration getConnector();
// for debug
- String description();
-
- void nodeAnnounced(String nodeID,
Pair<TransportConfiguration,TransportConfiguration> connectorPair);
+ String describe();
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -59,4 +59,9 @@
void deployBridge(BridgeConfiguration config) throws Exception;
void destroyBridge(String name) throws Exception;
+
+ /**
+ * @return
+ */
+ String describe();
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -28,11 +28,11 @@
import org.hornetq.api.core.client.ClientSession.BindingQuery;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.SendAcknowledgementHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
@@ -77,7 +77,7 @@
private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new
SimpleString("jms.topic.");
- protected final ServerLocatorInternal serverLocator;
+ protected final ServerLocator serverLocator;
private final UUID nodeUUID;
@@ -140,7 +140,7 @@
// Public --------------------------------------------------------
- public BridgeImpl(final ServerLocatorInternal serverLocator,
+ public BridgeImpl(final ServerLocator serverLocator,
final int reconnectAttempts,
final long retryInterval,
final double retryMultiplier,
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -24,7 +24,7 @@
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.api.core.management.ResourceNames;
@@ -52,9 +52,9 @@
public class ClusterConnectionBridge extends BridgeImpl
{
private static final Logger log = Logger.getLogger(ClusterConnectionBridge.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
-
+
private final ClusterConnection clusterConnection;
private final MessageFlowRecord flowRecord;
@@ -69,8 +69,13 @@
private final String targetNodeID;
+ private final TransportConfiguration connector;
+
+ private final ServerLocatorInternal discoveryLocator;
+
public ClusterConnectionBridge(final ClusterConnection clusterConnection,
- final ServerLocatorInternal serverLocator,
+ final ServerLocator targetLocator,
+ final ServerLocatorInternal discoveryLocator,
final int reconnectAttempts,
final long retryInterval,
final double retryMultiplier,
@@ -94,7 +99,7 @@
final MessageFlowRecord flowRecord,
final TransportConfiguration connector) throws
Exception
{
- super(serverLocator,
+ super(targetLocator,
reconnectAttempts,
retryInterval,
retryMultiplier,
@@ -112,18 +117,21 @@
password,
activated,
storageManager);
-
+
System.out.println("ClusterConnectionBridge");
+ this.discoveryLocator = discoveryLocator;
+
idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
-
+
this.clusterConnection = clusterConnection;
this.targetNodeID = targetNodeID;
this.managementAddress = managementAddress;
this.managementNotificationAddress = managementNotificationAddress;
this.flowRecord = flowRecord;
-
+ this.connector = connector;
+
// we need to disable DLQ check on the clustered bridges
queue.setInternalQueue(true);
}
@@ -161,7 +169,16 @@
private void setupNotificationConsumer() throws Exception
{
- log.debug("Setting up notificationConsumer for " + flowRecord + " on
bridge " + this.getName());
+ if (log.isDebugEnabled())
+ {
+ log.debug("Setting up notificationConsumer between " +
this.clusterConnection.getConnector() +
+ " and " +
+ flowRecord.getBridge().getForwardingConnection() +
+ " clusterConnection = " +
+ this.clusterConnection.getName() +
+ " on server " +
+ clusterConnection.getServer());
+ }
if (flowRecord != null)
{
flowRecord.reset();
@@ -170,7 +187,9 @@
{
try
{
- log.debug("Closing notification Consumer for reopening " +
notifConsumer + " on bridge " + this.getName());
+ log.debug("Closing notification Consumer for reopening " +
notifConsumer +
+ " on bridge " +
+ this.getName());
notifConsumer.close();
notifConsumer = null;
@@ -183,7 +202,9 @@
// Get the queue data
- String qName = "notif." +
UUIDGenerator.getInstance().generateStringUUID();
+ String qName = "notif." +
UUIDGenerator.getInstance().generateStringUUID() +
+ "." +
+ clusterConnection.getServer();
SimpleString notifQueueName = new SimpleString(qName);
@@ -236,7 +257,7 @@
{
log.debug("Cluster connetion bridge on " + clusterConnection +
" requesting information on queues");
}
-
+
prod.send(message);
}
}
@@ -247,23 +268,23 @@
System.out.println("afterConnect");
setupNotificationConsumer();
}
-
+
@Override
public void stop() throws Exception
{
super.stop();
}
-
+
protected void failed(final boolean permanently)
{
log.debug("Cluster Bridge " + this.getName() + " failed,
permanently=" + permanently);
super.fail(permanently);
-
+
if (permanently)
{
log.debug("cluster node for bridge " + this.getName() + " is
permanently down");
- serverLocator.notifyNodeDown(targetNodeID);
+ discoveryLocator.notifyNodeDown(targetNodeID);
}
-
+
}
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -16,6 +16,8 @@
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED;
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,6 +32,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -367,6 +370,11 @@
{
return nodeUUID.toString();
}
+
+ public HornetQServer getServer()
+ {
+ return server;
+ }
public synchronized Map<String, String> getNodes()
{
@@ -388,7 +396,10 @@
return;
}
- log.debug("Activating cluster connection nodeID=" + nodeUUID);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Activating cluster connection nodeID=" + nodeUUID + "
for server=" + this.server);
+ }
backup = false;
@@ -522,7 +533,7 @@
// and empty static connectors to create bridges... ulgy!
if (serverLocator == null)
{
- return;
+ return;
}
/*we dont create bridges to backups*/
if(connectorPair.a == null)
@@ -538,95 +549,7 @@
if (record == null)
{
- // New node - create a new flow record
- final SimpleString queueName = new SimpleString("sf." + name +
"." + nodeID);
-
- Binding queueBinding = postOffice.getBinding(queueName);
-
- Queue queue;
-
- if (queueBinding != null)
- {
- queue = (Queue)queueBinding.getBindable();
- }
- else
- {
- // Add binding in storage so the queue will get reloaded on startup and
we can find it - it's never
- // actually routed to at that address though
- queue = server.createQueue(queueName, queueName, null, true, false);
- }
-
- createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
- }
- else
- {
- log.info("Reattaching nodeID=" + nodeID);
- }
- }
- catch (Exception e)
- {
- log.error("Failed to update topology", e);
- }
- }
- }
-
- public void nodeAnnounced(final String nodeID,
- final Pair<TransportConfiguration,
TransportConfiguration> connectorPair)
- {
- log.warn(this + " WTF nodeAnnounced nodeID=" + nodeID, new Exception
("trace"));
- if (log.isDebugEnabled())
- {
- log.debug(this + " received nodeAnnouncedUp for " + nodeID + ",
connectorPair=" + connectorPair);
- }
-
- if (nodeID.equals(nodeUUID.toString()))
- {
- return;
- }
-
- // if the node is more than 1 hop away, we do not create a bridge for direct
cluster connection
- if (allowDirectConnectionsOnly &&
!allowableConnections.contains(connectorPair.a))
- {
- if (log.isDebugEnabled())
- {
- log.debug("Ignoring nodeUp message as it only allows direct
connections");
- }
- return;
- }
-
- // FIXME required to prevent cluster connections w/o discovery group
- // and empty static connectors to create bridges... ulgy!
- if (serverLocator == null)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Ignoring nodeUp as serverLocator==null");
- }
- return;
- }
- /*we dont create bridges to backups*/
- if(connectorPair.a == null)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Igoring nodeup as connectorPair.a==null (backup)");
- }
- return;
- }
-
- synchronized (records)
- {
- if (isTrace)
- {
- log.trace("Adding record for nodeID=" + nodeID);
- }
- try
- {
- MessageFlowRecord record = records.get(nodeID);
-
- if (record == null)
- {
// New node - create a new flow record
final SimpleString queueName = new SimpleString("sf." + name +
"." + nodeID);
@@ -648,13 +571,6 @@
createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
- else
- {
- if (isTrace)
- {
- log.trace("It already had a node created before, ignoring the
nodeUp message");
- }
- }
}
catch (Exception e)
{
@@ -662,19 +578,24 @@
}
}
}
-
- private void createNewRecord(final String nodeID,
+
+ private void createNewRecord(final String targetNodeID,
final TransportConfiguration connector,
final SimpleString queueName,
final Queue queue,
final boolean start) throws Exception
{
- MessageFlowRecordImpl record = new MessageFlowRecordImpl(nodeID, connector,
queueName, queue);
+ MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetNodeID, connector,
queueName, queue);
- records.put(nodeID, record);
+ records.put(targetNodeID, record);
- Bridge bridge = createBridge(record);
+ Bridge bridge = createClusteredBridge(record);
+ if (log.isDebugEnabled())
+ {
+ log.debug("PORRA creating record between " + this.connector + "
and " + connector + bridge);
+ }
+
record.setBridge(bridge);
if (start)
@@ -688,16 +609,36 @@
* @return
* @throws Exception
*/
- protected Bridge createBridge(MessageFlowRecordImpl record) throws Exception
+ protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
{
+
+ ServerLocator targetLocator =
HornetQClient.createServerLocatorWithoutHA(record.getConnector());
+
+ targetLocator.setReconnectAttempts(0);
+
+ targetLocator.setInitialConnectAttempts(0);
+ targetLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+ targetLocator.setConnectionTTL(connectionTTL);
+ targetLocator.setInitialConnectAttempts(0);
+
+
targetLocator.setConfirmationWindowSize(serverLocator.getConfirmationWindowSize());
+ targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
+ targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
+
+ if(retryInterval > 0)
+ {
+ targetLocator.setRetryInterval(retryInterval);
+ }
+
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
+ targetLocator,
serverLocator,
reconnectAttempts,
retryInterval,
retryIntervalMultiplier,
maxRetryInterval,
nodeUUID,
- record.getNodeID(),
+
record.getTargetNodeID(),
record.getQueueName(),
record.getQueue(),
executorFactory.getExecutor(),
@@ -724,7 +665,7 @@
{
private Bridge bridge;
- private final String nodeID;
+ private final String targetNodeID;
private final TransportConfiguration connector;
private final SimpleString queueName;
private final Queue queue;
@@ -733,21 +674,43 @@
private volatile boolean isClosed = false;
- private volatile boolean paused = false;
-
private volatile boolean firstReset = false;
- public MessageFlowRecordImpl(final String nodeID,
+ public MessageFlowRecordImpl(final String targetNodeID,
final TransportConfiguration connector,
final SimpleString queueName,
final Queue queue)
{
this.queue = queue;
- this.nodeID = nodeID;
+ this.targetNodeID = targetNodeID;
this.connector = connector;
this.queueName = queueName;
}
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "MessageFlowRecordImpl [nodeID=" + targetNodeID +
+ ", connector=" +
+ connector +
+ ", queueName=" +
+ queueName +
+ ", queue=" +
+ queue +
+ ", isClosed=" +
+ isClosed +
+ ", firstReset=" +
+ firstReset +
+ "]";
+ }
+
+
+
public String getAddress()
{
return address.toString();
@@ -756,9 +719,9 @@
/**
* @return the nodeID
*/
- public String getNodeID()
+ public String getTargetNodeID()
{
- return nodeID;
+ return targetNodeID;
}
/**
@@ -1026,11 +989,6 @@
log.trace("Adding binding " + clusterName + " into " +
ClusterConnectionImpl.this);
}
- synchronized (System.err)
- {
- new Exception("Adding binding " + clusterName + " into "
+ ClusterConnectionImpl.this).printStackTrace(System.out);
- }
-
bindings.put(clusterName, binding);
try
@@ -1214,18 +1172,24 @@
return "ClusterConnectionImpl [nodeUUID=" + nodeUUID + ",
connector=" + connector + ", address=" + address + "]";
}
- public String description()
+ public String describe()
{
- String out = name + " connected to\n";
+ StringWriter str = new StringWriter();
+ PrintWriter out = new PrintWriter(str);
+
+
+ out.println(this);
+ out.println("***************************************");
+ out.println(name + " connected to");
for (Entry<String, MessageFlowRecord> messageFlow : records.entrySet())
{
- String nodeID = messageFlow.getKey();
- Bridge bridge = messageFlow.getValue().getBridge();
-
- out += "\t" + nodeID + " -- " + bridge.isStarted() +
"\n";
+ out.println("\t Bridge = " + messageFlow.getValue().getBridge());
+ out.println("\t Flow Record = " + messageFlow.getValue());
}
+ out.println("***************************************");
- return out;
+
+ return str.toString();
}
interface ClusterConnector
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -15,6 +15,8 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.lang.reflect.Array;
import java.net.InetAddress;
import java.util.ArrayList;
@@ -142,7 +144,26 @@
this.clustered = clustered;
}
+
+ public String describe()
+ {
+ StringWriter str = new StringWriter();
+ PrintWriter out = new PrintWriter(str);
+
+ out.println("Information on " + this);
+ out.println("*******************************************************");
+ out.println("Topology: " + topology.describe());
+
+ for (ClusterConnection conn : this.clusterConnections.values())
+ {
+ out.println(conn.describe());
+ }
+
+ out.println("*******************************************************");
+ return str.toString();
+ }
+
public synchronized void start() throws Exception
{
if (started)
@@ -648,7 +669,7 @@
// We are going to manually retry on the bridge in case of failure
serverLocator.setReconnectAttempts(0);
- serverLocator.setInitialConnectAttempts(1);
+ serverLocator.setInitialConnectAttempts(-1);
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -14,6 +14,8 @@
package org.hornetq.core.server.impl;
import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.nio.channels.ClosedChannelException;
import java.security.AccessController;
@@ -830,7 +832,16 @@
// HornetQServer implementation
// -----------------------------------------------------------
-
+ public String describe()
+ {
+ StringWriter str = new StringWriter();
+ PrintWriter out = new PrintWriter(str);
+
+ out.println("Information about server " + this.identity);
+ out.println("Cluster Connection:" +
this.getClusterManager().describe());
+
+ return str.toString();
+ }
public void setIdentity(String identity)
{
this.identity = identity;
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -443,6 +443,10 @@
{
try
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("deleting temporary queue " + bindingName);
+ }
if (postOffice.getBinding(bindingName) != null)
{
postOffice.removeBinding(bindingName);
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -673,6 +673,10 @@
//
https://jira.jboss.org/jira/browse/HORNETQ-317
if (messagingServer == null || !messagingServer.isInitialised())
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("ignoring message " + notification + " as the
server is not initialized");
+ }
return;
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-06-21
15:03:33 UTC (rev 10865)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-06-21
15:34:44 UTC (rev 10866)
@@ -38,7 +38,7 @@
StringBuffer sb = new StringBuffer();
- sb.append("[").append(Thread.currentThread().getName()).append("]
");
+ sb.append("* [").append(Thread.currentThread().getName()).append("]
");
sb.append(calendar.get(GregorianCalendar.HOUR_OF_DAY) + ":" +
calendar.get(GregorianCalendar.MINUTE) +
":" +
@@ -49,6 +49,7 @@
sb.append(record.getLevel()).append(" [");
sb.append(stripPackage(record.getLoggerName())).append("]").append("
");
+ sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
sb.append(record.getMessage());
sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
@@ -66,6 +67,8 @@
{
}
}
+ sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
+
return sb.toString();
}