Author: clebert.suconic
Date: 2011-10-29 17:33:03 -0400 (Sat, 29 Oct 2011)
New Revision: 11617
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ServerLocator.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/DivertImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
Log:
JBPAPP-7333 - Fixing reconnection on clustered bridge
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ServerLocator.java 2011-10-28
13:00:52 UTC (rev 11616)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ServerLocator.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -54,6 +54,15 @@
* Create a ClientSessionFactory to a specific server. The server must already be
known about by this ServerLocator.
* This method allows the user to make a connection to a specific server bypassing any
load balancing policy in force
* @param transportConfiguration
+ * @return The ClientSesionFactory or null if the node is not present on the topology
+ * @throws Exception if a failure happened in creating the ClientSessionFactory or the
ServerLocator does not know about the passed in transportConfiguration
+ */
+ ClientSessionFactory createSessionFactory(final String nodeID) throws Exception;
+
+ /**
+ * Create a ClientSessionFactory to a specific server. The server must already be
known about by this ServerLocator.
+ * This method allows the user to make a connection to a specific server bypassing any
load balancing policy in force
+ * @param transportConfiguration
* @return The ClientSesionFactory
* @throws Exception if a failure happened in creating the ClientSessionFactory or the
ServerLocator does not know about the passed in transportConfiguration
*/
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-28
13:00:52 UTC (rev 11616)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -598,6 +598,40 @@
{
return afterConnectListener;
}
+
+ public ClientSessionFactory createSessionFactory(String nodeID) throws Exception
+ {
+ log.info(topology.describe("full topology"));
+ TopologyMember topologyMember = topology.getMember(nodeID);
+
+ log.info("Creating connection factory towards " + nodeID + " =
" + topologyMember);
+
+ if (topologyMember == null)
+ {
+ return null;
+ }
+ else
+ if (topologyMember.getA() != null)
+ {
+ ClientSessionFactoryInternal factory =
(ClientSessionFactoryInternal)createSessionFactory(topologyMember.getA());
+ if (topologyMember.getB() != null)
+ {
+ factory.setBackupConnector(topologyMember.getA(), topologyMember.getB());
+ }
+ return factory;
+ }
+ else if (topologyMember.getA() == null && topologyMember.getB() != null)
+ {
+ // This shouldn't happen, however I wanted this to consider all possible
cases
+ ClientSessionFactoryInternal factory =
(ClientSessionFactoryInternal)createSessionFactory(topologyMember.getB());
+ return factory;
+ }
+ else
+ {
+ // it shouldn't happen
+ return null;
+ }
+ }
public ClientSessionFactory createSessionFactory(final TransportConfiguration
transportConfiguration) throws Exception
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-10-28
13:00:52 UTC (rev 11616)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -815,6 +815,11 @@
PagingStoreImpl.log.warn("Messages are being dropped on address
" + getStoreName());
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Message " + message + " beig dropped for
fullAddressPolicy==DROP");
+ }
// Address is full, we just pretend we are paging, and drop the data
return true;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-10-28
13:00:52 UTC (rev 11616)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -595,6 +595,10 @@
{
binding.route(message, context);
}
+ else
+ {
+ log.warn("Couldn't find binding with id=" + bindingID + "
on routeFromCluster for message=" + message + " binding = " + this);
+ }
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-10-28
13:00:52 UTC (rev 11616)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -611,6 +611,19 @@
{
bindings.route(message, context);
}
+ else
+ {
+ // this is a debug and not warn because this could be a regular scenario on
publish-subscribe queues (or topic subscriptions on JMS)
+ if (log.isDebugEnabled())
+ {
+ log.debug("Couldn't find any bindings for address=" + address +
" on message=" + message);
+ }
+ }
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Message after routed=" + message);
+ }
if (context.getQueueCount() == 0)
{
@@ -625,6 +638,11 @@
// Send to the DLA for the address
SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("sending message to dla address = " + dlaAddress +
", message=" + message);
+ }
if (dlaAddress == null)
{
@@ -641,6 +659,13 @@
route(message, context.getTransaction(), false);
}
}
+ else
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Message " + message + " is not going anywhere as
it didn't have a binding on address:" + address);
+ }
+ }
}
else
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java 2011-10-28
13:00:52 UTC (rev 11616)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -52,6 +52,11 @@
{
throw new IllegalStateException("Binding already exists " + binding);
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Adding binding " + binding + " with address = " +
binding.getUniqueName(), new Exception ("trace"));
+ }
return addMappingInternal(binding.getAddress(), binding);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-10-28
13:00:52 UTC (rev 11616)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -514,6 +514,7 @@
{
if (requiresResponse)
{
+ log.debug("Sending exception to client", e);
response = new SessionXAResponseMessage(true, e.errorCode,
e.getMessage());
}
else
@@ -525,6 +526,7 @@
{
if (requiresResponse)
{
+ log.debug("Sending exception to client", e);
response = new HornetQExceptionMessage((HornetQException)e);
}
else
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-10-28
13:00:52 UTC (rev 11616)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -701,6 +701,12 @@
if (csf == null || csf.isClosed())
{
csf = createSessionFactory();
+ if (csf == null)
+ {
+ // Retrying. This probably means the node is not available (for the
cluster connection case)
+ scheduleRetryConnect();
+ return;
+ }
// Session is pre-acknowledge
session = (ClientSessionInternal)csf.createSession(user, password, false,
true, true, true, 1);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-10-28
13:00:52 UTC (rev 11616)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -74,8 +74,6 @@
private final long targetNodeEventUID;
- private final TransportConfiguration connector;
-
private final ServerLocatorInternal discoveryLocator;
public ClusterConnectionBridge(final ClusterConnection clusterConnection,
@@ -138,7 +136,6 @@
this.managementAddress = managementAddress;
this.managementNotificationAddress = managementNotificationAddress;
this.flowRecord = flowRecord;
- this.connector = connector;
// we need to disable DLQ check on the clustered bridges
queue.setInternalQueue(true);
@@ -152,7 +149,14 @@
protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
- ClientSessionFactoryInternal factory = super.createSessionFactory();
+ ClientSessionFactoryInternal factory =
(ClientSessionFactoryInternal)serverLocator.createSessionFactory(targetNodeID);
+
+ if (factory == null)
+ {
+ log.warn("NodeID=" + targetNodeID + " is not available on the
topology. Retrying the connection to that node now");
+ return null;
+ }
+ factory.setReconnectAttempts(0);
factory.getConnection().addFailureListener(new FailureListener()
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2011-10-28
13:00:52 UTC (rev 11616)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -311,6 +311,11 @@
buff.putLong(remoteQueueID);
message.putBytesProperty(idsHeaderName, ids);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Adding remoteQueue ID = " + remoteQueueID + " into
message=" + message + " store-forward-queue=" + storeAndForwardQueue);
+ }
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/DivertImpl.java 2011-10-28
13:00:52 UTC (rev 11616)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/DivertImpl.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -84,6 +84,11 @@
// properly on ack, since the original address will be overwritten
// TODO we can optimise this so it doesn't copy if it's not routed anywhere
else
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Diverting message " + message + " into " +
this);
+ }
long id = storageManager.generateUniqueID();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java 2011-10-28
13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -961,6 +961,7 @@
sb.append(getJMSMessageID());
sb.append("]:");
sb.append(message.isDurable() ? "PERSISTENT" :
"NON-PERSISTENT");
+ sb.append(",coreMessage=" + message);
return sb.toString();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java 2011-10-28
13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java 2011-10-29
21:33:03 UTC (rev 11617)
@@ -25,8 +25,12 @@
import static org.hornetq.utils.DataConstants.SHORT;
import static org.hornetq.utils.DataConstants.STRING;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
@@ -138,7 +142,7 @@
checkCreateProperties();
doPutValue(key, value == null ? new NullValue() : new StringValue(value));
}
-
+
public void putNullValue(final SimpleString key)
{
checkCreateProperties();
@@ -604,9 +608,65 @@
@Override
public String toString()
{
- return "TypedProperties[" + properties + "]";
+ StringWriter strWriter = new StringWriter();
+ PrintWriter out = new PrintWriter(strWriter);
+ Iterator<Entry<SimpleString, PropertyValue>> propIter =
properties.entrySet().iterator();
+ while (propIter.hasNext())
+ {
+ Entry<SimpleString, PropertyValue> item = propIter.next();
+
+ // When debugging TypedProperties, we need to identify the actual array on
PrintData and other log outputs
+ if (item.getKey().toString().startsWith("_HQ_ROUTE_TO"))
+ {
+ String outstr = toLongArray(item.getValue());
+
+ out.print(item.getKey() + "=" + outstr);
+ }
+ else
+ {
+ out.print(item.getKey() + "=" + item.getValue());
+ }
+ if (propIter.hasNext())
+ {
+ out.print(",");
+ }
+ }
+ return "TypedProperties[" + strWriter.toString() + "]";
}
+ /**
+ * @param item
+ * @return
+ */
+ private String toLongArray(PropertyValue value)
+ {
+ StringBuffer outstr = new StringBuffer();
+
+ try
+ {
+ byte[] ids = (byte [])value.getValue();
+
+ ByteBuffer buff = ByteBuffer.wrap(ids);
+
+ while (buff.hasRemaining())
+ {
+ long bindingID = buff.getLong();
+ outstr.append(bindingID);
+ if (buff.hasRemaining())
+ {
+ outstr.append(",");
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ outstr = new StringBuffer();
+ outstr.append(value.toString());
+ }
+ return "[" + outstr.toString() + "]";
+ }
+
// Private
------------------------------------------------------------------------------------
private void checkCreateProperties()