JBoss hornetq SVN: r11624 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-10-31 18:43:35 -0400 (Mon, 31 Oct 2011)
New Revision: 11624
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
Log:
fix on test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-10-31 22:34:59 UTC (rev 11623)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-10-31 22:43:35 UTC (rev 11624)
@@ -13,36 +13,30 @@
package org.hornetq.tests.integration.client;
-import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import junit.framework.Assert;
-
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
-import org.hornetq.core.client.impl.ClientConsumerInternal;
-import org.hornetq.core.config.Configuration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.impl.ServerSessionImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
-import org.hornetq.tests.util.RandomUtil;
-import org.hornetq.tests.util.UnitTestCase;
/**
* A LargeMessageTest
@@ -332,9 +326,9 @@
server.stop(false);
server.start();
- server.stop();
-
validateNoFilesOnLargeDir();
+
+ server.stop();
}
finally
{
13 years, 1 month
JBoss hornetq SVN: r11623 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-10-31 18:34:59 -0400 (Mon, 31 Oct 2011)
New Revision: 11623
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
Log:
removing possibly invalid test
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2011-10-31 22:27:51 UTC (rev 11622)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2011-10-31 22:34:59 UTC (rev 11623)
@@ -96,8 +96,10 @@
/*
* Don't wait for the response queue bindings to get to the other side
+ *
+ * TODO: I believe this test is invalid. I'm just ignoring it for now. It will probably go away
*/
- public void testRequestResponseNoWaitForBindings() throws Exception
+ public void invalidTest_testRequestResponseNoWaitForBindings() throws Exception
{
setupCluster();
13 years, 1 month
JBoss hornetq SVN: r11622 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-10-31 18:27:51 -0400 (Mon, 31 Oct 2011)
New Revision: 11622
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
fixing a deadlock on discoveryGroup.stop()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-31 19:20:04 UTC (rev 11621)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-31 22:27:51 UTC (rev 11622)
@@ -1243,9 +1243,9 @@
state = STATE.CLOSING;
- synchronized (this)
+ if (discoveryGroup != null)
{
- if (discoveryGroup != null)
+ synchronized (this)
{
try
{
@@ -1256,11 +1256,11 @@
log.error("Failed to stop discovery group", e);
}
}
- else
- {
- staticConnector.disconnect();
- }
}
+ else
+ {
+ staticConnector.disconnect();
+ }
synchronized (connectingFactories)
{
13 years, 1 month
JBoss hornetq SVN: r11621 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2011-10-31 15:20:04 -0400 (Mon, 31 Oct 2011)
New Revision: 11621
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
Fixing tests: TopologyClusterTestBase was wrong (needed to wait topology before start ServerLocator) and a few synchronization blocks were missing on Discovery initialization
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-31 19:20:04 UTC (rev 11621)
@@ -57,11 +57,15 @@
*/
public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
{
- /*needed for backward compatibility*/
+ /*needed for backward compatibility*/
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
+
/*end of compatibility fixes*/
- private enum STATE{ INITIALIZED, CLOSED, CLOSING};
-
+ private enum STATE
+ {
+ INITIALIZED, CLOSED, CLOSING
+ };
+
private static final long serialVersionUID = -1615857864410205260L;
private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
@@ -75,6 +79,7 @@
private transient String identity;
private final Set<ClientSessionFactoryInternal> factories = new HashSet<ClientSessionFactoryInternal>();
+
private final Set<ClientSessionFactoryInternal> connectingFactories = new HashSet<ClientSessionFactoryInternal>();
private TransportConfiguration[] initialConnectors;
@@ -530,7 +535,7 @@
return pair.getA();
}
-
+
// Get from initialconnectors
int pos = loadBalancingPolicy.select(initialConnectors.length);
@@ -601,20 +606,19 @@
{
return afterConnectListener;
}
-
+
public ClientSessionFactory createSessionFactory(String nodeID) throws Exception
{
log.info(topology.describe("full topology"));
TopologyMember topologyMember = topology.getMember(nodeID);
-
+
log.info("Creating connection factory towards " + nodeID + " = " + topologyMember);
-
+
if (topologyMember == null)
{
return null;
}
- else
- if (topologyMember.getA() != null)
+ else if (topologyMember.getA() != null)
{
ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal)createSessionFactory(topologyMember.getA());
if (topologyMember.getB() != null)
@@ -641,7 +645,7 @@
assertOpen();
initialise();
-
+
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
transportConfiguration,
callTimeout,
@@ -1239,31 +1243,34 @@
state = STATE.CLOSING;
- if (discoveryGroup != null)
+ synchronized (this)
{
- try
+ if (discoveryGroup != null)
{
- discoveryGroup.stop();
+ try
+ {
+ discoveryGroup.stop();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to stop discovery group", e);
+ }
}
- catch (Exception e)
+ else
{
- log.error("Failed to stop discovery group", e);
+ staticConnector.disconnect();
}
}
- else
- {
- staticConnector.disconnect();
- }
-
+
synchronized (connectingFactories)
{
for (ClientSessionFactoryInternal csf : connectingFactories)
{
- csf.close();
+ csf.close();
}
connectingFactories.clear();
}
-
+
synchronized (factories)
{
Set<ClientSessionFactoryInternal> clonedFactory = new HashSet<ClientSessionFactoryInternal>(factories);
@@ -1320,7 +1327,7 @@
readOnly = false;
state = STATE.CLOSED;
-
+
}
/** This is directly called when the connection to the node is gone,
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-10-31 19:20:04 UTC (rev 11621)
@@ -59,4 +59,6 @@
* Basically this is for cluster bridges being disconnected
*/
void disconnect();
+
+ boolean isConnected();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-10-31 19:20:04 UTC (rev 11621)
@@ -65,4 +65,6 @@
void informTopology();
void announceBackup();
+
+ boolean isNodeActive(String id);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-10-31 19:20:04 UTC (rev 11621)
@@ -131,6 +131,8 @@
private NotificationService notificationService;
+ private boolean stopping = false;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -198,7 +200,7 @@
{
this.notificationService = notificationService;
}
-
+
public synchronized void start() throws Exception
{
if (started)
@@ -208,6 +210,8 @@
started = true;
+ stopping = false;
+
if (activated)
{
activate();
@@ -221,7 +225,7 @@
notificationService.sendNotification(notification);
}
}
-
+
public String debug()
{
return toString();
@@ -304,20 +308,32 @@
});
}
- /** The cluster manager needs to use the same executor to close the serverLocator, otherwise the stop will break.
- * This method is intended to expose this executor to the ClusterManager */
+ public boolean isConnected()
+ {
+ return session != null;
+ }
+
+ /** The cluster manager needs to use the same executor to close the serverLocator, otherwise the stop will break.
+ * This method is intended to expose this executor to the ClusterManager */
public Executor getExecutor()
{
return executor;
}
-
+
public void stop() throws Exception
{
+ if (stopping)
+ {
+ return;
+ }
+
+ stopping = true;
+
if (log.isDebugEnabled())
{
log.debug("Bridge " + this.name + " being stopped");
}
-
+
if (futureScheduledReconnection != null)
{
futureScheduledReconnection.cancel(true);
@@ -470,7 +486,10 @@
{
if (log.isDebugEnabled())
{
- log.debug("The transformer " + transformer + " made a copy of the message " + message + " as transformedMessage");
+ log.debug("The transformer " + transformer +
+ " made a copy of the message " +
+ message +
+ " as transformedMessage");
}
}
return transformedMessage;
@@ -543,12 +562,12 @@
// that this will throw a disconnect, we need to remove the message
// from the acks so it will get resent, duplicate detection will cope
// with any messages resent
-
+
if (log.isTraceEnabled())
{
log.trace("going to send message " + message);
}
-
+
try
{
producer.send(dest, message);
@@ -579,7 +598,7 @@
{
producer.close();
}
-
+
csf.cleanup();
}
catch (Throwable dontCare)
@@ -680,7 +699,6 @@
return csf;
}
-
/* Hook for creating session factory */
protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
@@ -806,6 +824,12 @@
return;
}
+ if (stopping)
+ {
+ log.info("Bridge is stopping, will not retry");
+ return;
+ }
+
if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttempts)
{
log.warn("Bridge " + this.name +
@@ -968,7 +992,10 @@
{
public synchronized void run()
{
- connect();
+ if (!stopping)
+ {
+ connect();
+ }
}
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-10-31 19:20:04 UTC (rev 11621)
@@ -71,7 +71,7 @@
private final SimpleString idsHeaderName;
private final String targetNodeID;
-
+
private final long targetNodeEventUID;
private final ServerLocatorInternal discoveryLocator;
@@ -150,10 +150,11 @@
protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal)serverLocator.createSessionFactory(targetNodeID);
-
+
if (factory == null)
{
- log.warn("NodeID=" + targetNodeID + " is not available on the topology. Retrying the connection to that node now");
+ log.warn("NodeID=" + targetNodeID +
+ " is not available on the topology. Retrying the connection to that node now");
return null;
}
factory.setReconnectAttempts(0);
@@ -181,7 +182,7 @@
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
ServerMessage messageCopy = message.copy();
-
+
if (log.isTraceEnabled())
{
log.trace("Clustered bridge copied message " + message + " as " + messageCopy + " before delivery");
@@ -192,7 +193,7 @@
Set<SimpleString> propNames = new HashSet<SimpleString>(messageCopy.getPropertyNames());
byte[] queueIds = message.getBytesProperty(idsHeaderName);
-
+
if (queueIds == null)
{
// Sanity check only
@@ -215,7 +216,7 @@
messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
messageCopy = super.beforeForward(messageCopy);
-
+
return messageCopy;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-10-31 19:20:04 UTC (rev 11621)
@@ -142,15 +142,14 @@
private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
private final ClusterManagerInternal manager;
-
-
+
// Stuff that used to be on the ClusterManager
-
private final Topology topology = new Topology(this);
private volatile ServerLocatorInternal backupServerLocator;
+ private boolean stopping = false;
public ClusterConnectionImpl(final ClusterManagerInternal manager,
final TransportConfiguration[] tcConfigs,
@@ -214,7 +213,7 @@
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
-
+
this.topology.setExecutor(executor);
this.server = server;
@@ -325,7 +324,7 @@
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
-
+
this.topology.setExecutor(executor);
this.server = server;
@@ -359,7 +358,7 @@
this.manager = manager;
}
- public void start() throws Exception
+ public void start() throws Exception
{
synchronized (this)
{
@@ -367,10 +366,10 @@
{
return;
}
-
-
+
+ stopping = false;
started = true;
-
+
if (!backup)
{
activate();
@@ -378,7 +377,7 @@
}
}
-
+
public void flushExecutor()
{
Future future = new Future();
@@ -395,7 +394,7 @@
{
return;
}
-
+ stopping = true;
if (log.isDebugEnabled())
{
log.debug(this + "::stopping ClusterConnection");
@@ -435,9 +434,7 @@
props);
managementService.sendNotification(notification);
}
-
-
executor.execute(new Runnable()
{
public void run()
@@ -463,36 +460,35 @@
started = false;
}
-
public void announceBackup()
{
executor.execute(new Runnable()
{
public void run()
{
- try
- {
- if (log.isDebugEnabled())
- {
- log.debug(ClusterConnectionImpl.this + ":: announcing " + connector + " to " + backupServerLocator);
- }
- ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
- if (backupSessionFactory != null)
- {
- backupSessionFactory.getConnection()
- .getChannel(0, -1)
- .send(new NodeAnnounceMessage(System.currentTimeMillis(),
- nodeUUID.toString(),
- true,
- connector,
- null));
- log.info("backup announced");
- }
- }
- catch (Exception e)
- {
- log.warn("Unable to announce backup, retrying", e);
- }
+ try
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(ClusterConnectionImpl.this + ":: announcing " + connector + " to " + backupServerLocator);
+ }
+ ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
+ if (backupSessionFactory != null)
+ {
+ backupSessionFactory.getConnection()
+ .getChannel(0, -1)
+ .send(new NodeAnnounceMessage(System.currentTimeMillis(),
+ nodeUUID.toString(),
+ true,
+ connector,
+ null));
+ log.info("backup announced");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to announce backup, retrying", e);
+ }
}
});
}
@@ -501,7 +497,7 @@
{
return topology.getMember(manager.getNodeId());
}
-
+
public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
{
topology.addClusterTopologyListener(listener);
@@ -519,7 +515,7 @@
{
return topology;
}
-
+
public void nodeAnnounced(final long uniqueEventID,
final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
@@ -581,6 +577,16 @@
return server;
}
+ public boolean isNodeActive(String nodeId)
+ {
+ MessageFlowRecord rec = records.get(nodeId);
+ if (rec == null)
+ {
+ return false;
+ }
+ return rec.getBridge().isConnected();
+ }
+
public Map<String, String> getNodes()
{
synchronized (records)
@@ -610,7 +616,7 @@
}
backup = false;
-
+
topology.updateAsLive(manager.getNodeId(), new TopologyMember(connector, null));
if (backupServerLocator != null)
@@ -627,8 +633,6 @@
backupServerLocator = null;
}
-
-
serverLocator = clusterConnector.createServerLocator(true);
if (serverLocator != null)
@@ -696,6 +700,10 @@
public void nodeDown(final long eventUID, final String nodeID)
{
+ if (stopping)
+ {
+ return;
+ }
if (log.isDebugEnabled())
{
log.debug(this + " receiving nodeDown for nodeID=" + nodeID, new Exception("trace"));
@@ -731,6 +739,10 @@
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
+ if (stopping)
+ {
+ return;
+ }
if (log.isDebugEnabled())
{
String ClusterTestBase = "receiving nodeUP for nodeID=";
@@ -831,13 +843,13 @@
}
}
}
-
+
public synchronized void informTopology()
{
String nodeID = server.getNodeID().toString();
-
+
TopologyMember localMember;
-
+
if (backup)
{
localMember = new TopologyMember(null, connector);
@@ -850,7 +862,6 @@
topology.updateAsLive(nodeID, localMember);
}
-
private void createNewRecord(final long eventUID,
final String targetNodeID,
final TransportConfiguration connector,
@@ -859,21 +870,21 @@
final boolean start) throws Exception
{
final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology, true, connector);
-
+
String nodeId;
-
+
synchronized (this)
{
if (!started)
{
return;
}
-
+
if (serverLocator == null)
{
return;
}
-
+
nodeId = serverLocator.getNodeID();
}
@@ -1514,8 +1525,9 @@
@Override
public String toString()
{
- return "ClusterConnectionImpl@" + System.identityHashCode(this) +
- "[nodeUUID=" + nodeUUID +
+ return "ClusterConnectionImpl@" + System.identityHashCode(this) +
+ "[nodeUUID=" +
+ nodeUUID +
", connector=" +
connector +
", address=" +
@@ -1565,7 +1577,7 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
}
- ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null, true, tcConfigs);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, tcConfigs);
locator.setClusterConnection(true);
return locator;
}
@@ -1597,7 +1609,7 @@
public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
- ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null, true, dg);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, dg);
return locator;
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java 2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java 2011-10-31 19:20:04 UTC (rev 11621)
@@ -33,7 +33,13 @@
{
return false;
}
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
protected void setupCluster() throws Exception
{
setupCluster(false);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java 2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java 2011-10-31 19:20:04 UTC (rev 11621)
@@ -28,7 +28,6 @@
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.UnitTestCase;
/**
* A IsolatedTopologyTest
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-10-31 15:18:27 UTC (rev 11620)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-10-31 19:20:04 UTC (rev 11621)
@@ -17,6 +17,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.hornetq.api.core.HornetQException;
@@ -70,6 +71,11 @@
abstract protected boolean isNetty() throws Exception;
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -114,11 +120,12 @@
{
ok = (ok && actual.contains(nodeIDs[expected[i]]));
}
- if (ok)
+ if (ok)
{
return;
}
- } while(System.currentTimeMillis() - start < 5000);
+ }
+ while (System.currentTimeMillis() - start < 5000);
fail("did not contain all expected node ID: " + actual);
}
@@ -145,8 +152,8 @@
{
if (e.getCode() == HornetQException.OBJECT_CLOSED || e.getCode() == HornetQException.UNBLOCKED)
{
- ClientSessionFactory sf = locator.createSessionFactory();
- return sf.createSession();
+ ClientSessionFactory sf = locator.createSessionFactory();
+ return sf.createSession();
}
else
{
@@ -174,7 +181,14 @@
for (ClusterConnection clusterConn : clusterManager.getClusterConnections())
{
- nodesCount += clusterConn.getNodes().size();
+ Map<String, String> nodes = clusterConn.getNodes();
+ for (String id : nodes.keySet())
+ {
+ if (clusterConn.isNodeActive(id))
+ {
+ nodesCount++;
+ }
+ }
}
if (nodesCount == count)
@@ -185,85 +199,92 @@
Thread.sleep(10);
}
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
-
+
log.error(clusterDescription(servers[node]));
throw new IllegalStateException("Timed out waiting for cluster connections ");
}
+
public void testReceiveNotificationsWhenOtherNodesAreStartedAndStopped() throws Throwable
{
startServers(0);
ServerLocator locator = createHAServerLocator();
-
- ((ServerLocatorImpl)locator).getTopology().setOwner("testReceive");
- final List<String> nodes = new ArrayList<String>();
- final CountDownLatch upLatch = new CountDownLatch(5);
- final CountDownLatch downLatch = new CountDownLatch(4);
+ try
+ {
+ ((ServerLocatorImpl)locator).getTopology().setOwner("testReceive");
- locator.addClusterTopologyListener(new ClusterTopologyListener()
- {
- public void nodeUP(final long uniqueEventID,
- String nodeID,
- Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last)
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+ final CountDownLatch downLatch = new CountDownLatch(4);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- if(!nodes.contains(nodeID))
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ boolean last)
{
- System.out.println("Node UP " + nodeID + " added");
- log.info("Node UP " + nodeID + " added");
- nodes.add(nodeID);
- upLatch.countDown();
+ if (!nodes.contains(nodeID))
+ {
+ System.out.println("Node UP " + nodeID + " added");
+ log.info("Node UP " + nodeID + " added");
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
+ else
+ {
+ System.out.println("Node UP " + nodeID + " was already here");
+ log.info("Node UP " + nodeID + " was already here");
+ }
}
- else
- {
- System.out.println("Node UP " + nodeID + " was already here");
- log.info("Node UP " + nodeID + " was already here");
- }
- }
- public void nodeDown(final long uniqueEventID, String nodeID)
- {
- if (nodes.contains(nodeID))
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
- log.info("Node down " + nodeID + " accepted");
- System.out.println("Node down " + nodeID + " accepted");
- nodes.remove(nodeID);
- downLatch.countDown();
+ if (nodes.contains(nodeID))
+ {
+ log.info("Node down " + nodeID + " accepted");
+ System.out.println("Node down " + nodeID + " accepted");
+ nodes.remove(nodeID);
+ downLatch.countDown();
+ }
+ else
+ {
+ log.info("Node down " + nodeID + " already removed");
+ System.out.println("Node down " + nodeID + " already removed");
+ }
}
- else
- {
- log.info("Node down " + nodeID + " already removed");
- System.out.println("Node down " + nodeID + " already removed");
- }
- }
- });
+ });
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
- startServers(1, 4, 3, 2);
- String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+ startServers(1, 4, 3, 2);
+ String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
- assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
- checkContains(new int[] { 0, 1, 4, 3, 2 }, nodeIDs, nodes);
+ assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+ checkContains(new int[] { 0, 1, 4, 3, 2 }, nodeIDs, nodes);
- waitForClusterConnections(0, 4);
- waitForClusterConnections(1, 4);
- waitForClusterConnections(2, 4);
- waitForClusterConnections(3, 4);
- waitForClusterConnections(4, 4);
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
- stopServers(2, 3, 1, 4);
+ stopServers(2, 3, 1, 4);
- assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
- checkContains(new int[] { 0 }, nodeIDs, nodes);
+ assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
+ checkContains(new int[] { 0 }, nodeIDs, nodes);
- sf.close();
-
- locator.close();
-
- stopServers(0);
+ sf.close();
+ }
+ finally
+ {
+ locator.close();
+
+ stopServers(0);
+ }
+
}
public void testReceiveNotifications() throws Throwable
@@ -273,73 +294,81 @@
ServerLocator locator = createHAServerLocator();
- final List<String> nodes = new ArrayList<String>();
- final CountDownLatch upLatch = new CountDownLatch(5);
- final CountDownLatch downLatch = new CountDownLatch(4);
+ try
+ {
- locator.addClusterTopologyListener(new ClusterTopologyListener()
- {
- public void nodeUP(final long uniqueEventID,
- String nodeID,
- Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last)
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
+
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+ final CountDownLatch downLatch = new CountDownLatch(4);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- if (!nodes.contains(nodeID))
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ boolean last)
{
- nodes.add(nodeID);
- upLatch.countDown();
+ if (!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
}
- }
- public void nodeDown(final long uniqueEventID, String nodeID)
- {
- if (nodes.contains(nodeID))
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
- nodes.remove(nodeID);
- downLatch.countDown();
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ downLatch.countDown();
+ }
}
- }
- });
+ });
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
- assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
- checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+ assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+ checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
- ClientSession session = sf.createSession();
-
- waitForClusterConnections(0, 4);
- waitForClusterConnections(1, 4);
- waitForClusterConnections(2, 4);
- waitForClusterConnections(3, 4);
- waitForClusterConnections(4, 4);
+ ClientSession session = sf.createSession();
- stopServers(0);
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
+ stopServers(0);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
- stopServers(2);
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
+ stopServers(2);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
- stopServers(4);
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
+ stopServers(4);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
- stopServers(3);
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1 }, nodeIDs, nodes);
+ stopServers(3);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1 }, nodeIDs, nodes);
- stopServers(1);
-
- assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
- checkContains(new int[] {}, nodeIDs, nodes);
+ stopServers(1);
- sf.close();
-
- locator.close();
+ assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
+ checkContains(new int[] {}, nodeIDs, nodes);
+
+ sf.close();
+ }
+ finally
+ {
+ locator.close();
+ }
+
}
+
public void testStopNodes() throws Throwable
{
startServers(0, 1, 2, 3, 4);
@@ -347,80 +376,87 @@
ServerLocator locator = createHAServerLocator();
- final List<String> nodes = new ArrayList<String>();
- final CountDownLatch upLatch = new CountDownLatch(5);
+ try
+ {
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
- locator.addClusterTopologyListener(new ClusterTopologyListener()
- {
- public void nodeUP(final long uniqueEventID, String nodeID,
- Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last)
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- if (!nodes.contains(nodeID))
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ boolean last)
{
- nodes.add(nodeID);
- upLatch.countDown();
+ if (!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
}
- }
- public void nodeDown(final long uniqueEventID, String nodeID)
- {
- if (nodes.contains(nodeID))
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
- nodes.remove(nodeID);
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ }
}
- }
- });
+ });
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
- assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
- checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+ assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+ checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
- waitForClusterConnections(0, 4);
- waitForClusterConnections(1, 4);
- waitForClusterConnections(2, 4);
- waitForClusterConnections(3, 4);
- waitForClusterConnections(4, 4);
+ ClientSession session = sf.createSession();
- ClientSession session = sf.createSession();
-
- stopServers(0);
- assertFalse(servers[0].isStarted());
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
+ stopServers(0);
+ assertFalse(servers[0].isStarted());
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
- stopServers(2);
- assertFalse(servers[2].isStarted());
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
+ stopServers(2);
+ assertFalse(servers[2].isStarted());
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
- stopServers(4);
- assertFalse(servers[4].isStarted());
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
+ stopServers(4);
+ assertFalse(servers[4].isStarted());
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
- stopServers(3);
- assertFalse(servers[3].isStarted());
+ stopServers(3);
+ assertFalse(servers[3].isStarted());
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1 }, nodeIDs, nodes);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1 }, nodeIDs, nodes);
- stopServers(1);
- assertFalse(servers[1].isStarted());
- try
+ stopServers(1);
+ assertFalse(servers[1].isStarted());
+ try
+ {
+ session = checkSessionOrReconnect(session, locator);
+ fail();
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+ finally
{
- session = checkSessionOrReconnect(session, locator);
- fail();
+ locator.close();
}
- catch (Exception e)
- {
- }
-
- locator.close();
}
-
+
public void testMultipleClientSessionFactories() throws Throwable
{
startServers(0, 1, 2, 3, 4);
@@ -428,67 +464,75 @@
ServerLocator locator = createHAServerLocator();
- final List<String> nodes = new ArrayList<String>();
- final CountDownLatch upLatch = new CountDownLatch(5);
- final CountDownLatch downLatch = new CountDownLatch(4);
+ try
+ {
- locator.addClusterTopologyListener(new ClusterTopologyListener()
- {
- public void nodeUP(final long uniqueEventID, String nodeID,
- Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last)
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
+
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+ final CountDownLatch downLatch = new CountDownLatch(4);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- if (!nodes.contains(nodeID))
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ boolean last)
{
- nodes.add(nodeID);
- upLatch.countDown();
+ if (!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
}
- }
- public void nodeDown(final long uniqueEventID, String nodeID)
- {
- if (nodes.contains(nodeID))
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
- nodes.remove(nodeID);
- downLatch.countDown();
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ downLatch.countDown();
+ }
}
- }
- });
+ });
- ClientSessionFactory[] sfs = new ClientSessionFactory[] {
- locator.createSessionFactory(),
- locator.createSessionFactory(),
- locator.createSessionFactory(),
- locator.createSessionFactory(),
- locator.createSessionFactory() };
- assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
- checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+ ClientSessionFactory[] sfs = new ClientSessionFactory[] { locator.createSessionFactory(),
+ locator.createSessionFactory(),
+ locator.createSessionFactory(),
+ locator.createSessionFactory(),
+ locator.createSessionFactory() };
+ assertTrue("Was not notified that all servers are UP", upLatch.await(10, SECONDS));
+ checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
- waitForClusterConnections(0, 4);
- waitForClusterConnections(1, 4);
- waitForClusterConnections(2, 4);
- waitForClusterConnections(3, 4);
- waitForClusterConnections(4, 4);
- //we cant close all of the servers, we need to leave one up to notify us
- stopServers(4, 2, 3, 1);
+ // we cant close all of the servers, we need to leave one up to notify us
+ stopServers(4, 2, 3, 1);
- boolean ok = downLatch.await(10, SECONDS);
- if(!ok)
- {
- System.out.println("TopologyClusterTestBase.testMultipleClientSessionFactories");
+ boolean ok = downLatch.await(10, SECONDS);
+ if (!ok)
+ {
+ log.warn("TopologyClusterTestBase.testMultipleClientSessionFactories will fail");
+ }
+ assertTrue("Was not notified that all servers are Down", ok);
+ checkContains(new int[] { 0 }, nodeIDs, nodes);
+
+ for (int i = 0; i < sfs.length; i++)
+ {
+ ClientSessionFactory sf = sfs[i];
+ sf.close();
+ }
}
- assertTrue("Was not notified that all servers are Down", ok);
- checkContains(new int[] { 0 }, nodeIDs, nodes);
-
- for (int i = 0; i < sfs.length; i++)
+ finally
{
- ClientSessionFactory sf = sfs[i];
- sf.close();
+ locator.close();
+
+ stopServers(0);
}
-
- locator.close();
- stopServers(0);
}
// Private -------------------------------------------------------
13 years, 1 month
JBoss hornetq SVN: r11620 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-10-31 11:18:27 -0400 (Mon, 31 Oct 2011)
New Revision: 11620
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
added fix for backward compatibility
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-30 02:22:37 UTC (rev 11619)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-31 15:18:27 UTC (rev 11620)
@@ -57,6 +57,9 @@
*/
public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
{
+ /*needed for backward compatibility*/
+ private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
+ /*end of compatibility fixes*/
private enum STATE{ INITIALIZED, CLOSED, CLOSING};
private static final long serialVersionUID = -1615857864410205260L;
13 years, 1 month
JBoss hornetq SVN: r11619 - branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2011-10-29 22:22:37 -0400 (Sat, 29 Oct 2011)
New Revision: 11619
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java
Log:
Reverting changes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java 2011-10-30 02:21:53 UTC (rev 11618)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java 2011-10-30 02:22:37 UTC (rev 11619)
@@ -961,7 +961,6 @@
sb.append(getJMSMessageID());
sb.append("]:");
sb.append(message.isDurable() ? "PERSISTENT" : "NON-PERSISTENT");
- sb.append(",coreMessage=" + message);
return sb.toString();
}
13 years, 1 month
JBoss hornetq SVN: r11618 - branches/Branch_2_2_EAP/src/main/org/hornetq/utils.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2011-10-29 22:21:53 -0400 (Sat, 29 Oct 2011)
New Revision: 11618
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
Log:
Reverting changes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java 2011-10-29 21:33:03 UTC (rev 11617)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java 2011-10-30 02:21:53 UTC (rev 11618)
@@ -25,12 +25,8 @@
import static org.hornetq.utils.DataConstants.SHORT;
import static org.hornetq.utils.DataConstants.STRING;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
@@ -142,7 +138,7 @@
checkCreateProperties();
doPutValue(key, value == null ? new NullValue() : new StringValue(value));
}
-
+
public void putNullValue(final SimpleString key)
{
checkCreateProperties();
@@ -608,65 +604,9 @@
@Override
public String toString()
{
- StringWriter strWriter = new StringWriter();
- PrintWriter out = new PrintWriter(strWriter);
- Iterator<Entry<SimpleString, PropertyValue>> propIter = properties.entrySet().iterator();
- while (propIter.hasNext())
- {
- Entry<SimpleString, PropertyValue> item = propIter.next();
-
- // When debugging TypedProperties, we need to identify the actual array on PrintData and other log outputs
- if (item.getKey().toString().startsWith("_HQ_ROUTE_TO"))
- {
- String outstr = toLongArray(item.getValue());
-
- out.print(item.getKey() + "=" + outstr);
- }
- else
- {
- out.print(item.getKey() + "=" + item.getValue());
- }
- if (propIter.hasNext())
- {
- out.print(",");
- }
- }
- return "TypedProperties[" + strWriter.toString() + "]";
+ return "TypedProperties[" + properties + "]";
}
- /**
- * @param item
- * @return
- */
- private String toLongArray(PropertyValue value)
- {
- StringBuffer outstr = new StringBuffer();
-
- try
- {
- byte[] ids = (byte [])value.getValue();
-
- ByteBuffer buff = ByteBuffer.wrap(ids);
-
- while (buff.hasRemaining())
- {
- long bindingID = buff.getLong();
- outstr.append(bindingID);
- if (buff.hasRemaining())
- {
- outstr.append(",");
- }
- }
- }
- catch (Throwable e)
- {
- log.warn(e.getMessage(), e);
- outstr = new StringBuffer();
- outstr.append(value.toString());
- }
- return "[" + outstr.toString() + "]";
- }
-
// Private ------------------------------------------------------------------------------------
private void checkCreateProperties()
13 years, 1 month
JBoss hornetq SVN: r11617 - in branches/Branch_2_2_EAP/src/main/org/hornetq: core/client/impl and 7 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2011-10-29 17:33:03 -0400 (Sat, 29 Oct 2011)
New Revision: 11617
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ServerLocator.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/DivertImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
Log:
JBPAPP-7333 - Fixing reconnection on clustered bridge
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ServerLocator.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ServerLocator.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -54,6 +54,15 @@
* Create a ClientSessionFactory to a specific server. The server must already be known about by this ServerLocator.
* This method allows the user to make a connection to a specific server bypassing any load balancing policy in force
* @param transportConfiguration
+ * @return The ClientSesionFactory or null if the node is not present on the topology
+ * @throws Exception if a failure happened in creating the ClientSessionFactory or the ServerLocator does not know about the passed in transportConfiguration
+ */
+ ClientSessionFactory createSessionFactory(final String nodeID) throws Exception;
+
+ /**
+ * Create a ClientSessionFactory to a specific server. The server must already be known about by this ServerLocator.
+ * This method allows the user to make a connection to a specific server bypassing any load balancing policy in force
+ * @param transportConfiguration
* @return The ClientSesionFactory
* @throws Exception if a failure happened in creating the ClientSessionFactory or the ServerLocator does not know about the passed in transportConfiguration
*/
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -598,6 +598,40 @@
{
return afterConnectListener;
}
+
+ public ClientSessionFactory createSessionFactory(String nodeID) throws Exception
+ {
+ log.info(topology.describe("full topology"));
+ TopologyMember topologyMember = topology.getMember(nodeID);
+
+ log.info("Creating connection factory towards " + nodeID + " = " + topologyMember);
+
+ if (topologyMember == null)
+ {
+ return null;
+ }
+ else
+ if (topologyMember.getA() != null)
+ {
+ ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal)createSessionFactory(topologyMember.getA());
+ if (topologyMember.getB() != null)
+ {
+ factory.setBackupConnector(topologyMember.getA(), topologyMember.getB());
+ }
+ return factory;
+ }
+ else if (topologyMember.getA() == null && topologyMember.getB() != null)
+ {
+ // This shouldn't happen, however I wanted this to consider all possible cases
+ ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal)createSessionFactory(topologyMember.getB());
+ return factory;
+ }
+ else
+ {
+ // it shouldn't happen
+ return null;
+ }
+ }
public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -815,6 +815,11 @@
PagingStoreImpl.log.warn("Messages are being dropped on address " + getStoreName());
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Message " + message + " beig dropped for fullAddressPolicy==DROP");
+ }
// Address is full, we just pretend we are paging, and drop the data
return true;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -595,6 +595,10 @@
{
binding.route(message, context);
}
+ else
+ {
+ log.warn("Couldn't find binding with id=" + bindingID + " on routeFromCluster for message=" + message + " binding = " + this);
+ }
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -611,6 +611,19 @@
{
bindings.route(message, context);
}
+ else
+ {
+ // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
+ if (log.isDebugEnabled())
+ {
+ log.debug("Couldn't find any bindings for address=" + address + " on message=" + message);
+ }
+ }
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Message after routed=" + message);
+ }
if (context.getQueueCount() == 0)
{
@@ -625,6 +638,11 @@
// Send to the DLA for the address
SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("sending message to dla address = " + dlaAddress + ", message=" + message);
+ }
if (dlaAddress == null)
{
@@ -641,6 +659,13 @@
route(message, context.getTransaction(), false);
}
}
+ else
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address);
+ }
+ }
}
else
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -52,6 +52,11 @@
{
throw new IllegalStateException("Binding already exists " + binding);
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Adding binding " + binding + " with address = " + binding.getUniqueName(), new Exception ("trace"));
+ }
return addMappingInternal(binding.getAddress(), binding);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -514,6 +514,7 @@
{
if (requiresResponse)
{
+ log.debug("Sending exception to client", e);
response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
}
else
@@ -525,6 +526,7 @@
{
if (requiresResponse)
{
+ log.debug("Sending exception to client", e);
response = new HornetQExceptionMessage((HornetQException)e);
}
else
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -701,6 +701,12 @@
if (csf == null || csf.isClosed())
{
csf = createSessionFactory();
+ if (csf == null)
+ {
+ // Retrying. This probably means the node is not available (for the cluster connection case)
+ scheduleRetryConnect();
+ return;
+ }
// Session is pre-acknowledge
session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -74,8 +74,6 @@
private final long targetNodeEventUID;
- private final TransportConfiguration connector;
-
private final ServerLocatorInternal discoveryLocator;
public ClusterConnectionBridge(final ClusterConnection clusterConnection,
@@ -138,7 +136,6 @@
this.managementAddress = managementAddress;
this.managementNotificationAddress = managementNotificationAddress;
this.flowRecord = flowRecord;
- this.connector = connector;
// we need to disable DLQ check on the clustered bridges
queue.setInternalQueue(true);
@@ -152,7 +149,14 @@
protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
- ClientSessionFactoryInternal factory = super.createSessionFactory();
+ ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal)serverLocator.createSessionFactory(targetNodeID);
+
+ if (factory == null)
+ {
+ log.warn("NodeID=" + targetNodeID + " is not available on the topology. Retrying the connection to that node now");
+ return null;
+ }
+ factory.setReconnectAttempts(0);
factory.getConnection().addFailureListener(new FailureListener()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -311,6 +311,11 @@
buff.putLong(remoteQueueID);
message.putBytesProperty(idsHeaderName, ids);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Adding remoteQueue ID = " + remoteQueueID + " into message=" + message + " store-forward-queue=" + storeAndForwardQueue);
+ }
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/DivertImpl.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/DivertImpl.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -84,6 +84,11 @@
// properly on ack, since the original address will be overwritten
// TODO we can optimise this so it doesn't copy if it's not routed anywhere else
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("Diverting message " + message + " into " + this);
+ }
long id = storageManager.generateUniqueID();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessage.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -961,6 +961,7 @@
sb.append(getJMSMessageID());
sb.append("]:");
sb.append(message.isDurable() ? "PERSISTENT" : "NON-PERSISTENT");
+ sb.append(",coreMessage=" + message);
return sb.toString();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java 2011-10-28 13:00:52 UTC (rev 11616)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/TypedProperties.java 2011-10-29 21:33:03 UTC (rev 11617)
@@ -25,8 +25,12 @@
import static org.hornetq.utils.DataConstants.SHORT;
import static org.hornetq.utils.DataConstants.STRING;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
@@ -138,7 +142,7 @@
checkCreateProperties();
doPutValue(key, value == null ? new NullValue() : new StringValue(value));
}
-
+
public void putNullValue(final SimpleString key)
{
checkCreateProperties();
@@ -604,9 +608,65 @@
@Override
public String toString()
{
- return "TypedProperties[" + properties + "]";
+ StringWriter strWriter = new StringWriter();
+ PrintWriter out = new PrintWriter(strWriter);
+ Iterator<Entry<SimpleString, PropertyValue>> propIter = properties.entrySet().iterator();
+ while (propIter.hasNext())
+ {
+ Entry<SimpleString, PropertyValue> item = propIter.next();
+
+ // When debugging TypedProperties, we need to identify the actual array on PrintData and other log outputs
+ if (item.getKey().toString().startsWith("_HQ_ROUTE_TO"))
+ {
+ String outstr = toLongArray(item.getValue());
+
+ out.print(item.getKey() + "=" + outstr);
+ }
+ else
+ {
+ out.print(item.getKey() + "=" + item.getValue());
+ }
+ if (propIter.hasNext())
+ {
+ out.print(",");
+ }
+ }
+ return "TypedProperties[" + strWriter.toString() + "]";
}
+ /**
+ * @param item
+ * @return
+ */
+ private String toLongArray(PropertyValue value)
+ {
+ StringBuffer outstr = new StringBuffer();
+
+ try
+ {
+ byte[] ids = (byte [])value.getValue();
+
+ ByteBuffer buff = ByteBuffer.wrap(ids);
+
+ while (buff.hasRemaining())
+ {
+ long bindingID = buff.getLong();
+ outstr.append(bindingID);
+ if (buff.hasRemaining())
+ {
+ outstr.append(",");
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ outstr = new StringBuffer();
+ outstr.append(value.toString());
+ }
+ return "[" + outstr.toString() + "]";
+ }
+
// Private ------------------------------------------------------------------------------------
private void checkCreateProperties()
13 years, 1 month
JBoss hornetq SVN: r11616 - trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-28 09:00:52 -0400 (Fri, 28 Oct 2011)
New Revision: 11616
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
Fix NPE, when replicationEndpoint is closed before initialization.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-10-28 13:00:40 UTC (rev 11615)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-10-28 13:00:52 UTC (rev 11616)
@@ -330,11 +330,15 @@
}
filesReservedForSync.clear();
- for (Journal j : journals)
+ if (journals != null)
{
- if (j instanceof FileWrapperJournal)
- j.stop();
+ for (Journal j : journals)
+ {
+ if (j instanceof FileWrapperJournal)
+ j.stop();
+ }
}
+
pageManager.stop();
// Storage needs to be the last to stop
13 years, 1 month
JBoss hornetq SVN: r11615 - trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-28 09:00:40 -0400 (Fri, 28 Oct 2011)
New Revision: 11615
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
Use locks on confirmPendingLargeMessage(|TX), methods were added on merge from 2_2_EAP.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-10-28 13:00:26 UTC (rev 11614)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-10-28 13:00:40 UTC (rev 11615)
@@ -349,66 +349,66 @@
return replicator != null;
}
- /**
- * @param replicationManager
- * @param pagingManager
- * @throws HornetQException
- */
+ /**
+ * @param replicationManager
+ * @param pagingManager
+ * @throws HornetQException
+ */
@Override
- public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception
- {
- if (!started)
- {
- throw new IllegalStateException("JournalStorageManager must be started...");
- }
- assert replicationManager != null;
+ public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception
+ {
+ if (!started)
+ {
+ throw new IllegalStateException("JournalStorageManager must be started...");
+ }
+ assert replicationManager != null;
- if (!(messageJournal instanceof JournalImpl) || !(bindingsJournal instanceof JournalImpl))
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR,
- "journals here are not JournalImpl. You can't set a replicator!");
- }
- JournalFile[] messageFiles = null;
- JournalFile[] bindingsFiles = null;
+ if (!(messageJournal instanceof JournalImpl) || !(bindingsJournal instanceof JournalImpl))
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
+ "journals here are not JournalImpl. You can't set a replicator!");
+ }
+ JournalFile[] messageFiles = null;
+ JournalFile[] bindingsFiles = null;
- final Journal localMessageJournal = messageJournal;
- final Journal localBindingsJournal = bindingsJournal;
+ final Journal localMessageJournal = messageJournal;
+ final Journal localBindingsJournal = bindingsJournal;
- Map<String, Long> largeMessageFilesToSync;
- Map<SimpleString, Collection<Integer>> pageFilesToSync;
- storageManagerLock.writeLock().lock();
+ Map<String, Long> largeMessageFilesToSync;
+ Map<SimpleString, Collection<Integer>> pageFilesToSync;
+ storageManagerLock.writeLock().lock();
try
- {
- replicator = replicationManager;
- localMessageJournal.synchronizationLock();
- localBindingsJournal.synchronizationLock();
+ {
+ replicator = replicationManager;
+ localMessageJournal.synchronizationLock();
+ localBindingsJournal.synchronizationLock();
try
- {
+ {
pagingManager.lock();
try
- {
- messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
- bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
- pageFilesToSync = getPageInformationForSync(pagingManager);
- largeMessageFilesToSync = getLargeMessageInformation();
- }
+ {
+ messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
+ bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+ pageFilesToSync = getPageInformationForSync(pagingManager);
+ largeMessageFilesToSync = getLargeMessageInformation();
+ }
finally
- {
+ {
pagingManager.unlock();
- }
- }
+ }
+ }
finally
- {
- localMessageJournal.synchronizationUnlock();
- localBindingsJournal.synchronizationUnlock();
- }
+ {
+ localMessageJournal.synchronizationUnlock();
+ localBindingsJournal.synchronizationUnlock();
+ }
bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
- }
+ }
finally
- {
- storageManagerLock.writeLock().unlock();
- }
+ {
+ storageManagerLock.writeLock().unlock();
+ }
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
@@ -416,36 +416,35 @@
sendPagesToBackup(pageFilesToSync, pagingManager);
storageManagerLock.writeLock().lock();
- try
- {
- replicator.sendSynchronizationDone();
- // XXX HORNETQ-720 SEND a compare journals message?
- }
- finally
- {
- storageManagerLock.writeLock().unlock();
- }
- }
+ try
+ {
+ replicator.sendSynchronizationDone();
+ // XXX HORNETQ-720 SEND a compare journals message?
+ }
+ finally
+ {
+ storageManagerLock.writeLock().unlock();
+ }
+ }
+ /**
+ * @param pageFilesToSync
+ * @throws Exception
+ */
+ private void sendPagesToBackup(Map<SimpleString, Collection<Integer>> pageFilesToSync, PagingManager manager)
+ throws Exception
+ {
+ for (Entry<SimpleString, Collection<Integer>> entry : pageFilesToSync.entrySet())
+ {
+ if (!started)
+ return;
+ PagingStore store = manager.getPageStore(entry.getKey());
+ store.sendPages(replicator, entry.getValue());
+ }
+ }
/**
- * @param pageFilesToSync
- * @throws Exception
- */
- private void sendPagesToBackup(Map<SimpleString, Collection<Integer>> pageFilesToSync, PagingManager manager)
- throws Exception
- {
- for (Entry<SimpleString, Collection<Integer>> entry : pageFilesToSync.entrySet())
- {
- if (!started)
- return;
- PagingStore store = manager.getPageStore(entry.getKey());
- store.sendPages(replicator, entry.getValue());
- }
- }
-
- /**
* @param pagingManager
* @return
* @throws Exception
@@ -573,16 +572,18 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.utils.SimpleString, int, org.hornetq.api.core.buffers.ChannelBuffer)
- */
- public void pageWrite(final PagedMessage message, final int pageNumber)
- {
- if (isReplicated())
- {
- replicator.pageWrite(message, pageNumber);
- }
- }
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.utils.SimpleString,
+ * int, org.hornetq.api.core.buffers.ChannelBuffer)
+ */
+ public void pageWrite(final PagedMessage message, final int pageNumber)
+ {
+ if (isReplicated())
+ {
+ replicator.pageWrite(message, pageNumber);
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#getContext()
@@ -637,58 +638,58 @@
return new LargeServerMessageImpl(this);
}
- protected final void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes)
- throws Exception
- {
- readLock();
+ protected final void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes)
+ throws Exception
+ {
+ readLock();
try
- {
- file.position(file.size());
+ {
+ file.position(file.size());
- file.writeDirect(ByteBuffer.wrap(bytes), false);
+ file.writeDirect(ByteBuffer.wrap(bytes), false);
- if (isReplicated())
- {
- replicator.largeMessageWrite(messageId, bytes);
- }
- }
+ if (isReplicated())
+ {
+ replicator.largeMessageWrite(messageId, bytes);
+ }
+ }
finally
- {
- readUnLock();
- }
- }
+ {
+ readUnLock();
+ }
+ }
public LargeServerMessage createLargeMessage(final long id, final MessageInternal message) throws Exception
- {
- readLock();
+ {
+ readLock();
try
- {
- if (isReplicated())
- {
- replicator.largeMessageBegin(id);
- }
+ {
+ if (isReplicated())
+ {
+ replicator.largeMessageBegin(id);
+ }
- LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
+ LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
- largeMessage.copyHeadersAndProperties(message);
+ largeMessage.copyHeadersAndProperties(message);
- largeMessage.setMessageID(id);
+ largeMessage.setMessageID(id);
- if (largeMessage.isDurable())
- {
- // We store a marker on the journal that the large file is pending
- long pendingRecordID = storePendingLargeMessage(id);
+ if (largeMessage.isDurable())
+ {
+ // We store a marker on the journal that the large file is pending
+ long pendingRecordID = storePendingLargeMessage(id);
- largeMessage.setPendingRecordID(pendingRecordID);
- }
+ largeMessage.setPendingRecordID(pendingRecordID);
+ }
- return largeMessage;
+ return largeMessage;
}
- finally
- {
- readUnLock();
- }
- }
+ finally
+ {
+ readUnLock();
+ }
+ }
// Non transactional operations
@@ -713,100 +714,104 @@
}
}
- public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception
+ public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception
+ {
+ readLock();
+ try
{
- installLargeMessageConfirmationOnTX(tx, recordID);
- messageJournal.appendDeleteRecordTransactional(tx.getID(),
- recordID,
- new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
+ installLargeMessageConfirmationOnTX(tx, recordID);
+ messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID,
+ new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
}
-
- /** We don't need messageID now but we are likely to need it we ever decide to support a database */
- public void confirmPendingLargeMessage(long recordID) throws Exception
+ finally
{
- messageJournal.appendDeleteRecord(recordID, true, getContext());
+ readUnLock();
}
+ }
- public void storeMessage(final ServerMessage message) throws Exception
- {
- if (message.getMessageID() <= 0)
- {
- // Sanity check only... this shouldn't happen unless there is a bug
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
- }
-
- readLock();
+ /** We don't need messageID now but we are likely to need it we ever decide to support a database */
+ public void confirmPendingLargeMessage(long recordID) throws Exception
+ {
+ readLock();
try
- {
- // Note that we don't sync, the add reference that comes immediately after will sync if appropriate
-
- if (message.isLargeMessage())
- {
- messageJournal.appendAddRecord(message.getMessageID(),
- JournalStorageManager.ADD_LARGE_MESSAGE,
- new LargeMessageEncoding((LargeServerMessage)message),
- false,
- getContext(false));
- }
- else
- {
- messageJournal.appendAddRecord(message.getMessageID(),
- JournalStorageManager.ADD_MESSAGE,
- message,
- false,
- getContext(false));
- }
- }
+ {
+ messageJournal.appendDeleteRecord(recordID, true, getContext());
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
-
+ public void storeMessage(final ServerMessage message) throws Exception
+ {
+ if (message.getMessageID() <= 0)
{
- readLock();
+ // Sanity check only... this shouldn't happen unless there is a bug
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
+ }
+
+ readLock();
try
- {
- messageJournal.appendUpdateRecord(messageID,
- JournalStorageManager.ADD_REF,
- new RefEncoding(queueID),
- last && syncNonTransactional,
- getContext(last && syncNonTransactional));
- }
+ {
+ // Note that we don't sync, the add reference that comes immediately after will sync if
+ // appropriate
+
+ if (message.isLargeMessage())
+ {
+ messageJournal.appendAddRecord(message.getMessageID(), JournalStorageManager.ADD_LARGE_MESSAGE,
+ new LargeMessageEncoding((LargeServerMessage)message), false,
+ getContext(false));
+ }
+ else
+ {
+ messageJournal.appendAddRecord(message.getMessageID(), JournalStorageManager.ADD_MESSAGE, message, false,
+ getContext(false));
+ }
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- private void readLock()
+ public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
+ {
+ readLock();
+ try
{
- storageManagerLock.readLock().lock();
+ messageJournal.appendUpdateRecord(messageID, JournalStorageManager.ADD_REF, new RefEncoding(queueID), last &&
+ syncNonTransactional, getContext(last && syncNonTransactional));
}
-
- private void readUnLock()
+ finally
{
- storageManagerLock.readLock().unlock();
+ readUnLock();
}
+ }
- public void storeAcknowledge(final long queueID, final long messageID) throws Exception
- {
- readLock();
+ private void readLock()
+ {
+ storageManagerLock.readLock().lock();
+ }
+
+ private void readUnLock()
+ {
+ storageManagerLock.readLock().unlock();
+ }
+
+ public void storeAcknowledge(final long queueID, final long messageID) throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendUpdateRecord(messageID,
- JournalStorageManager.ACKNOWLEDGE_REF,
- new RefEncoding(queueID),
- syncNonTransactional,
- getContext(syncNonTransactional));
- }
+ {
+ messageJournal.appendUpdateRecord(messageID, JournalStorageManager.ACKNOWLEDGE_REF, new RefEncoding(queueID),
+ syncNonTransactional, getContext(syncNonTransactional));
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
{
@@ -882,135 +887,125 @@
}
}
- public void deleteDuplicateID(final long recordID) throws Exception
- {
- readLock();
+ public void deleteDuplicateID(final long recordID) throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
- }
+ {
+ messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
// Transactional operations
- public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
+ public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
+ {
+ if (message.getMessageID() <= 0)
{
- if (message.getMessageID() <= 0)
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
- }
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
+ }
- readLock();
+ readLock();
try
- {
- if (message.isLargeMessage())
- {
- messageJournal.appendAddRecordTransactional(txID,
- message.getMessageID(),
- JournalStorageManager.ADD_LARGE_MESSAGE,
- new LargeMessageEncoding(((LargeServerMessage)message)));
- }
- else
- {
- messageJournal.appendAddRecordTransactional(txID,
- message.getMessageID(),
- JournalStorageManager.ADD_MESSAGE,
- message);
- }
+ {
+ if (message.isLargeMessage())
+ {
+ messageJournal.appendAddRecordTransactional(txID, message.getMessageID(),
+ JournalStorageManager.ADD_LARGE_MESSAGE,
+ new LargeMessageEncoding(((LargeServerMessage)message)));
+ }
+ else
+ {
+ messageJournal.appendAddRecordTransactional(txID, message.getMessageID(),
+ JournalStorageManager.ADD_MESSAGE, message);
+ }
- }
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
- {
- readLock();
+ public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
+ {
+ readLock();
try
- {
- pageTransaction.setRecordID(generateUniqueID());
-
- messageJournal.appendAddRecordTransactional(txID,
- pageTransaction.getRecordID(),
- JournalStorageManager.PAGE_TRANSACTION,
- pageTransaction);
- }
+ {
+ pageTransaction.setRecordID(generateUniqueID());
+ messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(),
+ JournalStorageManager.PAGE_TRANSACTION, pageTransaction);
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
- {
- readLock();
+ public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages)
+ throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(),
- JournalStorageManager.PAGE_TRANSACTION,
- new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
- depages));
- }
+ {
+ messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(),
+ JournalStorageManager.PAGE_TRANSACTION,
+ new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+ depages));
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception
- {
- readLock();
+ public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
- JournalStorageManager.PAGE_TRANSACTION,
- new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
- syncNonTransactional,
- getContext(syncNonTransactional));
- }
+ {
+ messageJournal.appendUpdateRecord(pageTransaction.getRecordID(), JournalStorageManager.PAGE_TRANSACTION,
+ new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
+ syncNonTransactional, getContext(syncNonTransactional));
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
- {
- readLock();
+ public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendUpdateRecordTransactional(txID,
- messageID,
- JournalStorageManager.ADD_REF,
- new RefEncoding(queueID));
- }
+ {
+ messageJournal.appendUpdateRecordTransactional(txID, messageID, JournalStorageManager.ADD_REF,
+ new RefEncoding(queueID));
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
- {
- readLock();
+ public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID)
+ throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendUpdateRecordTransactional(txID,
- messageID,
- JournalStorageManager.ACKNOWLEDGE_REF,
- new RefEncoding(queueID));
- }
+ {
+ messageJournal.appendUpdateRecordTransactional(txID, messageID, JournalStorageManager.ACKNOWLEDGE_REF,
+ new RefEncoding(queueID));
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long, long, org.hornetq.core.paging.cursor.PagePosition)
@@ -1201,48 +1196,46 @@
}
}
- public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception
- {
- readLock();
+ public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
- }
+ {
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- // Other operations
+ // Other operations
- public void updateDeliveryCount(final MessageReference ref) throws Exception
+ public void updateDeliveryCount(final MessageReference ref) throws Exception
+ {
+ // no need to store if it's the same value
+ // otherwise the journal will get OME in case of lots of redeliveries
+ if (ref.getDeliveryCount() != ref.getPersistedCount())
{
- // no need to store if it's the same value
- // otherwise the journal will get OME in case of lots of redeliveries
- if (ref.getDeliveryCount() != ref.getPersistedCount())
- {
- ref.setPersistedCount(ref.getDeliveryCount());
- DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
- ref.getDeliveryCount());
+ ref.setPersistedCount(ref.getDeliveryCount());
+ DeliveryCountUpdateEncoding updateInfo =
+ new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());
- readLock();
+ readLock();
try
- {
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
- JournalStorageManager.UPDATE_DELIVERY_COUNT,
- updateInfo,
+ {
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+ JournalStorageManager.UPDATE_DELIVERY_COUNT, updateInfo,
- syncNonTransactional,
- getContext(syncNonTransactional));
- }
+ syncNonTransactional, getContext(syncNonTransactional));
+ }
finally
- {
- readUnLock();
- }
- }
+ {
+ readUnLock();
+ }
}
+ }
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
{
@@ -1928,26 +1921,22 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
- */
- public long storePageCounter(long txID, long queueID, long value) throws Exception
- {
- readLock();
+ @Override
+ public long storePageCounter(long txID, long queueID, long value) throws Exception
+ {
+ readLock();
try
- {
- final long recordID = idGenerator.generateID();
- messageJournal.appendAddRecordTransactional(txID,
- recordID,
- JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
- new PageCountRecord(queueID, value));
- return recordID;
- }
+ {
+ final long recordID = idGenerator.generateID();
+ messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
+ new PageCountRecord(queueID, value));
+ return recordID;
+ }
finally
- {
- readUnLock();
- }
- }
+ {
+ readUnLock();
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
@@ -2184,11 +2173,11 @@
return;
}
Runnable deleteAction = new Runnable()
- {
- public void run()
- {
+ {
+ public void run()
+ {
try
- {
+ {
readLock();
try
{
@@ -2196,31 +2185,31 @@
{
replicator.largeMessageDelete(largeServerMessageImpl.getMessageID());
}
- file.delete();
- }
+ file.delete();
+ }
finally
{
readUnLock();
}
}
catch (Exception e)
- {
- JournalStorageManager.log.warn(e.getMessage(), e);
- }
- }
-
- };
-
- if (executor == null)
{
- deleteAction.run();
+ JournalStorageManager.log.warn(e.getMessage(), e);
}
- else
- {
- executor.execute(deleteAction);
- }
- }
+ }
+ };
+
+ if (executor == null)
+ {
+ deleteAction.run();
+ }
+ else
+ {
+ executor.execute(deleteAction);
+ }
+ }
+
/**
* @param messageID
* @return
@@ -2542,33 +2531,30 @@
}
}
- /**
- * @throws Exception
- */
- private void cleanupIncompleteFiles() throws Exception
- {
- if (largeMessagesFactory != null)
- {
- List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
- for (String tmpFile : tmpFiles)
- {
- SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
- file.delete();
- }
- }
- }
+ private void cleanupIncompleteFiles() throws Exception
+ {
+ if (largeMessagesFactory != null)
+ {
+ List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
+ for (String tmpFile : tmpFiles)
+ {
+ SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
+ file.delete();
+ }
+ }
+ }
- private OperationContext getContext(final boolean sync)
- {
- if (sync)
- {
- return getContext();
- }
- else
- {
- return DummyOperationContext.getInstance();
- }
- }
+ private OperationContext getContext(final boolean sync)
+ {
+ if (sync)
+ {
+ return getContext();
+ }
+ else
+ {
+ return DummyOperationContext.getInstance();
+ }
+ }
private static ClassLoader getThisClassLoader()
{
@@ -2714,14 +2700,11 @@
public boolean isCommit;
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "HeuristicCompletionEncoding [xid=" + xid + ", isCommit=" + isCommit + "]";
- }
+ public String toString()
+ {
+ return "HeuristicCompletionEncoding [xid=" + xid + ", isCommit=" + isCommit + "]";
+ }
HeuristicCompletionEncoding(final Xid xid, final boolean isCommit)
{
@@ -2807,15 +2790,12 @@
return clusterName;
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "GroupingEncoding [id=" + id + ", groupId=" + groupId + ", clusterName=" + clusterName + "]";
- }
- }
+ public String toString()
+ {
+ return "GroupingEncoding [id=" + id + ", groupId=" + groupId + ", clusterName=" + clusterName + "]";
+ }
+ }
public static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
{
@@ -2831,21 +2811,12 @@
{
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "PersistentQueueBindingEncoding [id=" + id +
- ", name=" +
- name +
- ", address=" +
- address +
- ", filterString=" +
- filterString +
- "]";
- }
+ public String toString()
+ {
+ return "PersistentQueueBindingEncoding [id=" + id + ", name=" + name + ", address=" + address +
+ ", filterString=" + filterString + "]";
+ }
public PersistentQueueBindingEncoding(final SimpleString name,
final SimpleString address,
@@ -3017,15 +2988,11 @@
return 8 + 4;
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "DeliveryCountUpdateEncoding [queueID=" + queueID + ", count=" + count + "]";
- }
-
+ public String toString()
+ {
+ return "DeliveryCountUpdateEncoding [queueID=" + queueID + ", count=" + count + "]";
+ }
}
public static class QueueEncoding implements EncodingSupport
@@ -3058,33 +3025,30 @@
return 8;
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "QueueEncoding [queueID=" + queueID + "]";
- }
+ public String toString()
+ {
+ return "QueueEncoding [queueID=" + queueID + "]";
+ }
}
private static class DeleteEncoding extends QueueEncoding
{
- public byte recordType;
+ public byte recordType;
- public long id;
+ public long id;
- public DeleteEncoding()
- {
- super();
- }
+ public DeleteEncoding()
+ {
+ super();
+ }
- public DeleteEncoding(final byte recordType, final long id)
- {
- this.recordType = recordType;
- this.id = id;
- }
+ public DeleteEncoding(final byte recordType, final long id)
+ {
+ this.recordType = recordType;
+ this.id = id;
+ }
/* (non-Javadoc)
* @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
@@ -3132,18 +3096,15 @@
public static class PageUpdateTXEncoding implements EncodingSupport
{
- public long pageTX;
+ public long pageTX;
- public int recods;
+ public int recods;
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "PageUpdateTXEncoding [pageTX=" + pageTX + ", recods=" + recods + "]";
- }
+ public String toString()
+ {
+ return "PageUpdateTXEncoding [pageTX=" + pageTX + ", recods=" + recods + "]";
+ }
public PageUpdateTXEncoding()
{
@@ -3188,14 +3149,11 @@
{
long scheduledDeliveryTime;
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "]";
- }
+ public String toString()
+ {
+ return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "]";
+ }
private ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID)
{
@@ -3270,12 +3228,9 @@
return SimpleString.sizeofString(address) + DataConstants.SIZE_INT + duplID.length;
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
+ public String toString()
+ {
// this would be useful when testing. Most tests on the testsuite will use a SimpleString on the duplicate ID
// and this may be useful to validate the journal on those tests
// You may uncomment these two lines on that case and replcate the toString for the PrintData
@@ -3341,14 +3296,11 @@
private static final class PageCountRecord implements EncodingSupport
{
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "PageCountRecord [queueID=" + queueID + ", value=" + value + "]";
- }
+ public String toString()
+ {
+ return "PageCountRecord [queueID=" + queueID + ", value=" + value + "]";
+ }
PageCountRecord()
{
@@ -3396,14 +3348,11 @@
private static final class PageCountRecordInc implements EncodingSupport
{
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + "]";
- }
+ public String toString()
+ {
+ return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + "]";
+ }
PageCountRecordInc()
{
@@ -3475,14 +3424,11 @@
this.position = new PagePositionImpl();
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
- }
+ public String toString()
+ {
+ return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
+ }
public long queueID;
@@ -3520,57 +3466,50 @@
private class LargeMessageTXFailureCallback implements TransactionFailureCallback
{
- private final Map<Long, ServerMessage> messages;
+ private final Map<Long, ServerMessage> messages;
- public LargeMessageTXFailureCallback(final Map<Long, ServerMessage> messages)
- {
- super();
- this.messages = messages;
- }
+ public LargeMessageTXFailureCallback(final Map<Long, ServerMessage> messages)
+ {
+ super();
+ this.messages = messages;
+ }
- public void failedTransaction(final long transactionID,
- final List<RecordInfo> records,
- final List<RecordInfo> recordsToDelete)
- {
- for (RecordInfo record : records)
- {
- if (record.userRecordType == JournalStorageManager.ADD_LARGE_MESSAGE)
- {
- byte[] data = record.data;
+ public void failedTransaction(final long transactionID, final List<RecordInfo> records,
+ final List<RecordInfo> recordsToDelete)
+ {
+ for (RecordInfo record : records)
+ {
+ if (record.userRecordType == JournalStorageManager.ADD_LARGE_MESSAGE)
+ {
+ byte[] data = record.data;
- HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
try
- {
- LargeServerMessage serverMessage = parseLargeMessage(messages, buff);
- serverMessage.decrementDelayDeletionCount();
- }
+ {
+ LargeServerMessage serverMessage = parseLargeMessage(messages, buff);
+ serverMessage.decrementDelayDeletionCount();
+ }
catch (Exception e)
- {
- JournalStorageManager.log.warn(e.getMessage(), e);
- }
- }
+ {
+ JournalStorageManager.log.warn(e.getMessage(), e);
}
- }
+ }
+ }
+ }
+ }
+ private static String describeRecord(RecordInfo info)
+ {
+ return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";" +
+ newObjectEncoding(info);
}
- private static String describeRecord(RecordInfo info)
- {
- return "recordID=" + info.id +
- ";userRecordType=" +
- info.userRecordType +
- ";isUpdate=" +
- info.isUpdate +
- ";" +
- newObjectEncoding(info);
- }
+ private static String describeRecord(RecordInfo info, Object o)
+ {
+ return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";" + o;
+ }
- private static String describeRecord(RecordInfo info, Object o)
- {
- return "recordID=" + info.id + ";userRecordType=" + info.userRecordType + ";isUpdate=" + info.isUpdate + ";" + o;
- }
-
// Encoding functions for binding Journal
public static Object newObjectEncoding(RecordInfo info)
@@ -3861,220 +3800,216 @@
return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
}
- /**
- * @param fileFactory
- * @param journal
- * @throws Exception
- */
+ /**
+ * @param fileFactory
+ * @param journal
+ * @throws Exception
+ */
private static void describeJournal(SequentialFileFactory fileFactory, JournalImpl journal) throws Exception
- {
- List<JournalFile> files = journal.orderFiles();
+ {
+ List<JournalFile> files = journal.orderFiles();
- final PrintStream out = System.out;
+ final PrintStream out = System.out;
- for (JournalFile file : files)
- {
- out.println("#" + file);
+ for (JournalFile file : files)
+ {
+ out.println("#" + file);
- JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
- {
+ JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+ {
- public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- out.println("operation@UpdateTX;txID=" + transactionID + "," + describeRecord(recordInfo));
- }
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@UpdateTX;txID=" + transactionID + "," + describeRecord(recordInfo));
+ }
- public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
- {
- out.println("operation@Update;" + describeRecord(recordInfo));
- }
+ public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@Update;" + describeRecord(recordInfo));
+ }
- public void onReadRollbackRecord(final long transactionID) throws Exception
- {
- out.println("operation@Rollback;txID=" + transactionID);
- }
+ public void onReadRollbackRecord(final long transactionID) throws Exception
+ {
+ out.println("operation@Rollback;txID=" + transactionID);
+ }
- public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
- {
- out.println("operation@Prepare,txID=" + transactionID +
- ",numberOfRecords=" +
- numberOfRecords +
- ",extraData=" +
- encode(extraData));
- }
+ public void
+ onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords)
+ throws Exception
+ {
+ out.println("operation@Prepare,txID=" + transactionID + ",numberOfRecords=" + numberOfRecords +
+ ",extraData=" + encode(extraData));
+ }
- public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- out.println("operation@DeleteRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo));
- }
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@DeleteRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo));
+ }
- public void onReadDeleteRecord(final long recordID) throws Exception
- {
- out.println("operation@DeleteRecord;recordID=" + recordID);
- }
+ public void onReadDeleteRecord(final long recordID) throws Exception
+ {
+ out.println("operation@DeleteRecord;recordID=" + recordID);
+ }
- public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
- {
- out.println("operation@Commit;txID=" + transactionID + ",numberOfRecords=" + numberOfRecords);
- }
+ public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
+ {
+ out.println("operation@Commit;txID=" + transactionID + ",numberOfRecords=" + numberOfRecords);
+ }
- public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- out.println("operation@AddRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo));
- }
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@AddRecordTX;txID=" + transactionID + "," + describeRecord(recordInfo));
+ }
- public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
- {
- out.println("operation@AddRecord;" + describeRecord(recordInfo));
- }
+ public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@AddRecord;" + describeRecord(recordInfo));
+ }
- public void markAsDataFile(final JournalFile file)
- {
- }
- });
+ public void markAsDataFile(final JournalFile file)
+ {
}
+ });
+ }
- out.println();
+ out.println();
- out.println("### Surviving Records Summary ###");
+ out.println("### Surviving Records Summary ###");
- List<RecordInfo> records = new LinkedList<RecordInfo>();
- List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+ List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
- journal.start();
+ journal.start();
- final StringBuffer bufferFailingTransactions = new StringBuffer();
+ final StringBuffer bufferFailingTransactions = new StringBuffer();
- int messageCount = 0;
- Map<Long, Integer> messageRefCounts = new HashMap<Long, Integer>();
- int preparedMessageCount = 0;
- Map<Long, Integer> preparedMessageRefCount = new HashMap<Long, Integer>();
- journal.load(records, preparedTransactions, new TransactionFailureCallback()
- {
+ int messageCount = 0;
+ Map<Long, Integer> messageRefCounts = new HashMap<Long, Integer>();
+ int preparedMessageCount = 0;
+ Map<Long, Integer> preparedMessageRefCount = new HashMap<Long, Integer>();
+ journal.load(records, preparedTransactions, new TransactionFailureCallback()
+ {
- public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
- {
- bufferFailingTransactions.append("Transaction " + transactionID + " failed with these records:\n");
- for (RecordInfo info : records)
- {
- bufferFailingTransactions.append("- " + describeRecord(info) + "\n");
- }
-
- for (RecordInfo info : recordsToDelete)
- {
- bufferFailingTransactions.append("- " + describeRecord(info) + " <marked to delete>\n");
- }
-
- }
- }, false);
-
- for (RecordInfo info : records)
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ bufferFailingTransactions.append("Transaction " + transactionID + " failed with these records:\n");
+ for (RecordInfo info : records)
{
- Object o = newObjectEncoding(info);
- if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
- {
- messageCount++;
- }
- else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
- {
- ReferenceDescribe ref = (ReferenceDescribe)o;
- Integer count = messageRefCounts.get(ref.refEncoding.queueID);
- if (count == null)
- {
- count = 1;
- messageRefCounts.put(ref.refEncoding.queueID, count);
- }
- else
- {
- messageRefCounts.put(ref.refEncoding.queueID, count + 1);
- }
- }
- else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
- {
- AckDescribe ref = (AckDescribe)o;
- Integer count = messageRefCounts.get(ref.refEncoding.queueID);
- if (count == null)
- {
- messageRefCounts.put(ref.refEncoding.queueID, 0);
- }
- else
- {
- messageRefCounts.put(ref.refEncoding.queueID, count - 1);
- }
- }
- out.println(describeRecord(info, o));
+ bufferFailingTransactions.append("- " + describeRecord(info) + "\n");
}
- out.println();
- out.println("### Prepared TX ###");
-
- for (PreparedTransactionInfo tx : preparedTransactions)
+ for (RecordInfo info : recordsToDelete)
{
- System.out.println(tx.id);
- for (RecordInfo info : tx.records)
- {
- Object o = newObjectEncoding(info);
- out.println("- " + describeRecord(info, o));
- if (info.getUserRecordType() == 31)
- {
- preparedMessageCount++;
- }
- else if (info.getUserRecordType() == 32)
- {
- ReferenceDescribe ref = (ReferenceDescribe)o;
- Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID);
- if (count == null)
- {
- count = 1;
- preparedMessageRefCount.put(ref.refEncoding.queueID, count);
- }
- else
- {
- preparedMessageRefCount.put(ref.refEncoding.queueID, count + 1);
- }
- }
- }
-
- for (RecordInfo info : tx.recordsToDelete)
- {
- out.println("- " + describeRecord(info) + " <marked to delete>");
- }
+ bufferFailingTransactions.append("- " + describeRecord(info) + " <marked to delete>\n");
}
- String missingTX = bufferFailingTransactions.toString();
+ }
+ }, false);
- if (missingTX.length() > 0)
+ for (RecordInfo info : records)
+ {
+ Object o = newObjectEncoding(info);
+ if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
+ {
+ messageCount++;
+ }
+ else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe)o;
+ Integer count = messageRefCounts.get(ref.refEncoding.queueID);
+ if (count == null)
{
- out.println();
- out.println("### Failed Transactions (Missing commit/prepare/rollback record) ###");
+ count = 1;
+ messageRefCounts.put(ref.refEncoding.queueID, count);
}
-
- out.println(bufferFailingTransactions.toString());
-
- out.println("### Message Counts ###");
- out.println("message count=" + messageCount);
- out.println("message reference count");
- for (Map.Entry<Long, Integer> longIntegerEntry : messageRefCounts.entrySet())
+ else
{
- System.out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
+ messageRefCounts.put(ref.refEncoding.queueID, count + 1);
}
+ }
+ else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
+ {
+ AckDescribe ref = (AckDescribe)o;
+ Integer count = messageRefCounts.get(ref.refEncoding.queueID);
+ if (count == null)
+ {
+ messageRefCounts.put(ref.refEncoding.queueID, 0);
+ }
+ else
+ {
+ messageRefCounts.put(ref.refEncoding.queueID, count - 1);
+ }
+ }
+ out.println(describeRecord(info, o));
+ }
- out.println("prepared message count=" + preparedMessageCount);
+ out.println();
+ out.println("### Prepared TX ###");
- for (Map.Entry<Long, Integer> longIntegerEntry : preparedMessageRefCount.entrySet())
+ for (PreparedTransactionInfo tx : preparedTransactions)
+ {
+ System.out.println(tx.id);
+ for (RecordInfo info : tx.records)
+ {
+ Object o = newObjectEncoding(info);
+ out.println("- " + describeRecord(info, o));
+ if (info.getUserRecordType() == 31)
{
- System.out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
+ preparedMessageCount++;
}
+ else if (info.getUserRecordType() == 32)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe)o;
+ Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID);
+ if (count == null)
+ {
+ count = 1;
+ preparedMessageRefCount.put(ref.refEncoding.queueID, count);
+ }
+ else
+ {
+ preparedMessageRefCount.put(ref.refEncoding.queueID, count + 1);
+ }
+ }
+ }
- journal.stop();
- }
+ for (RecordInfo info : tx.recordsToDelete)
+ {
+ out.println("- " + describeRecord(info) + " <marked to delete>");
+ }
+ }
+ String missingTX = bufferFailingTransactions.toString();
+
+ if (missingTX.length() > 0)
+ {
+ out.println();
+ out.println("### Failed Transactions (Missing commit/prepare/rollback record) ###");
+ }
+
+ out.println(bufferFailingTransactions.toString());
+
+ out.println("### Message Counts ###");
+ out.println("message count=" + messageCount);
+ out.println("message reference count");
+ for (Map.Entry<Long, Integer> longIntegerEntry : messageRefCounts.entrySet())
+ {
+ System.out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
+ }
+
+ out.println("prepared message count=" + preparedMessageCount);
+
+ for (Map.Entry<Long, Integer> longIntegerEntry : preparedMessageRefCount.entrySet())
+ {
+ System.out.println("queue id " + longIntegerEntry.getKey() + ",count=" + longIntegerEntry.getValue());
+ }
+
+ journal.stop();
+ }
+
@Override
- public boolean addToPage(PagingManager pagingManager,
- SimpleString address,
- ServerMessage message,
- RoutingContext ctx,
- RouteContextList listCtx) throws Exception
+ public boolean addToPage(PagingManager pagingManager, SimpleString address, ServerMessage message,
+ RoutingContext ctx, RouteContextList listCtx) throws Exception
{
readLock();
try
@@ -4088,16 +4023,17 @@
}
}
- private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
- {
- TXLargeMessageConfirmationOperation txoper = (TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
- if (txoper == null)
- {
- txoper = new TXLargeMessageConfirmationOperation();
- tx.putProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS, txoper);
- }
- txoper.confirmedMessages.add(recordID);
- }
+ private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
+ {
+ TXLargeMessageConfirmationOperation txoper =
+ (TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
+ if (txoper == null)
+ {
+ txoper = new TXLargeMessageConfirmationOperation();
+ tx.putProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS, txoper);
+ }
+ txoper.confirmedMessages.add(recordID);
+ }
class TXLargeMessageConfirmationOperation implements TransactionOperation
{
13 years, 1 month