JBoss hornetq SVN: r11360 - branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-16 23:38:54 -0400 (Fri, 16 Sep 2011)
New Revision: 11360
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
cluster cleanup
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-17 03:00:35 UTC (rev 11359)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-17 03:38:54 UTC (rev 11360)
@@ -475,7 +475,6 @@
true,
connector,
null));
- backupSessionFactory.close();
log.info("backup announced");
}
}
@@ -1555,7 +1554,7 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
}
- ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, tcConfigs);
+ ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true);
return locator;
}
@@ -1587,7 +1586,7 @@
public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
- ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, dg);
+ ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, dg);
return locator;
}
13 years, 3 months
JBoss hornetq SVN: r11359 - in branches/Branch_2_2_EAP_cluster4: src/main/org/hornetq/core/protocol/core/impl and 5 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-16 23:00:35 -0400 (Fri, 16 Sep 2011)
New Revision: 11359
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
cluster cleanup
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-17 03:00:35 UTC (rev 11359)
@@ -1111,6 +1111,11 @@
}
}
+ public String getIdentity()
+ {
+ return identity;
+ }
+
public void setIdentity(String identity)
{
this.identity = identity;
@@ -1282,8 +1287,8 @@
{
log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
}
-
- if (topology.removeMember(eventTime, nodeID))
+
+ if (!(isClusterConnection() && nodeID.equals(this.getNodeID())) && topology.removeMember(eventTime, nodeID))
{
if (topology.isEmpty())
{
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-17 03:00:35 UTC (rev 11359)
@@ -39,8 +39,12 @@
void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
- /** Used to better identify Cluster Connection Locators on logs while debugging logs */
+ /** Used to better identify Cluster Connection Locators on logs. To facilitate eventual debugging.
+ *
+ * This method used to be on tests interface, but I'm now making it part of the public interface since*/
void setIdentity(String identity);
+
+ String getIdentity();
void setNodeID(String nodeID);
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-17 03:00:35 UTC (rev 11359)
@@ -108,7 +108,7 @@
{
if (log.isDebugEnabled())
{
- log.info(this + "::Live node " + nodeId + "=" + memberInput);
+ log.debug(this + "::node " + nodeId + "=" + memberInput);
}
memberInput.setUniqueEventID(System.currentTimeMillis());
mapTopology.remove(nodeId);
@@ -212,7 +212,7 @@
currentMember +
", memberInput=" +
memberInput +
- "newMember=" + newMember);
+ "newMember=" + newMember, new Exception ("trace"));
}
@@ -301,7 +301,7 @@
{
if (member.getUniqueEventID() > uniqueEventID)
{
- log.info("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
+ log.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
member = null;
}
else
@@ -482,22 +482,17 @@
public synchronized String describe(final String text)
{
- String desc = text + "\n";
+ String desc = text + "topology on " + this + ":\n";
for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(mapTopology).entrySet())
{
desc += "\t" + entry.getKey() + " => " + entry.getValue() + "\n";
}
desc += "\t" + "nodes=" + nodes() + "\t" + "members=" + members();
- return desc;
- }
-
- public void clear()
- {
- if (Topology.log.isDebugEnabled())
+ if (mapTopology.isEmpty())
{
- Topology.log.debug(this + "::clear", new Exception("trace"));
+ desc += "\tEmpty";
}
- mapTopology.clear();
+ return desc;
}
public int members()
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-17 03:00:35 UTC (rev 11359)
@@ -178,16 +178,18 @@
};
final boolean isCC = msg.isClusterConnection();
-
- acceptorUsed.getClusterConnection().addClusterTopologyListener(listener, isCC);
-
- rc.addCloseListener(new CloseListener()
+ if (acceptorUsed.getClusterConnection() != null)
{
- public void connectionClosed()
+ acceptorUsed.getClusterConnection().addClusterTopologyListener(listener, isCC);
+
+ rc.addCloseListener(new CloseListener()
{
- acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener, isCC);
- }
- });
+ public void connectionClosed()
+ {
+ acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener, isCC);
+ }
+ });
+ }
}
else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
{
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-17 03:00:35 UTC (rev 11359)
@@ -62,7 +62,7 @@
// for debug
String describe();
- void announceNode();
+ void informTopology();
void announceBackup();
}
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-17 03:00:35 UTC (rev 11359)
@@ -49,6 +49,8 @@
void flushExecutor();
void announceBackup() throws Exception;
+
+ void deploy() throws Exception;
void deployBridge(BridgeConfiguration config) throws Exception;
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-17 03:00:35 UTC (rev 11359)
@@ -449,7 +449,7 @@
public void announceBackup()
{
- this.backupServerLocator = clusterConnector.createServerLocator();
+ this.backupServerLocator = clusterConnector.createServerLocator(false);
backupServerLocator.setReconnectAttempts(-1);
backupServerLocator.setInitialConnectAttempts(-1);
@@ -475,9 +475,9 @@
true,
connector,
null));
+ backupSessionFactory.close();
log.info("backup announced");
}
- //backupSessionFactory.close();
}
catch (Exception e)
{
@@ -619,7 +619,7 @@
- serverLocator = clusterConnector.createServerLocator();
+ serverLocator = clusterConnector.createServerLocator(true);
if (serverLocator != null)
{
@@ -674,6 +674,7 @@
log.debug("sending notification: " + notification);
managementService.sendNotification(notification);
}
+
}
public TransportConfiguration getConnector()
@@ -780,7 +781,6 @@
{
log.debug(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
}
- log.info(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
// New node - create a new flow record
@@ -822,7 +822,7 @@
}
}
- public synchronized void announceNode()
+ public synchronized void informTopology()
{
String nodeID = server.getNodeID().toString();
@@ -1504,7 +1504,8 @@
@Override
public String toString()
{
- return "ClusterConnectionImpl [nodeUUID=" + nodeUUID +
+ return "ClusterConnectionImpl@" + System.identityHashCode(this) +
+ "[nodeUUID=" + nodeUUID +
", connector=" +
connector +
", address=" +
@@ -1534,7 +1535,7 @@
interface ClusterConnector
{
- ServerLocatorInternal createServerLocator();
+ ServerLocatorInternal createServerLocator(boolean includeTopology);
}
private class StaticClusterConnector implements ClusterConnector
@@ -1546,7 +1547,7 @@
this.tcConfigs = tcConfigs;
}
- public ServerLocatorInternal createServerLocator()
+ public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
if (tcConfigs != null && tcConfigs.length > 0)
{
@@ -1554,7 +1555,9 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
}
- return new ServerLocatorImpl(topology, true, tcConfigs);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, tcConfigs);
+ locator.setClusterConnection(true);
+ return locator;
}
else
{
@@ -1582,9 +1585,11 @@
this.dg = dg;
}
- public ServerLocatorInternal createServerLocator()
+ public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
- return new ServerLocatorImpl(topology, true, dg);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology : null, true, dg);
+ return locator;
+
}
}
}
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-17 03:00:35 UTC (rev 11359)
@@ -168,13 +168,8 @@
return nodeUUID.toString();
}
- public synchronized void start() throws Exception
+ public synchronized void deploy() throws Exception
{
- if (started)
- {
- return;
- }
-
if (clustered)
{
for (BroadcastGroupConfiguration config : configuration.getBroadcastGroupConfigurations())
@@ -186,22 +181,48 @@
{
deployClusterConnection(config);
}
+ }
- for (ClusterConnection conn : clusterConnections.values())
+ for (BridgeConfiguration config : configuration.getBridgeConfigurations())
+ {
+ deployBridge(config);
+ }
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ for (BroadcastGroup group: broadcastGroups.values())
+ {
+ if (!backup)
{
- conn.announceNode();
- if (backup)
- {
- conn.announceBackup();
- }
+ group.start();
}
}
+
+ for (ClusterConnection conn : clusterConnections.values())
+ {
+ conn.start();
+ if (backup)
+ {
+ conn.informTopology();
+ conn.announceBackup();
+ }
+ }
- for (BridgeConfiguration config : configuration.getBridgeConfigurations())
+ for (Bridge bridge : bridges.values())
{
- deployBridge(config);
+ if (!backup)
+ {
+ bridge.start();
+ }
}
+
started = true;
}
@@ -255,7 +276,7 @@
clusterLocators.clear();
started = false;
- clusterConnections.clear();
+ clearClusterConnections();
}
public void flushExecutor()
@@ -487,10 +508,6 @@
managementService.registerBridge(bridge, config);
- if (!backup)
- {
- bridge.start();
- }
}
public void destroyBridge(final String name) throws Exception
@@ -536,11 +553,18 @@
e.printStackTrace();
}
}
- clusterConnections.clear();
+ clearClusterConnections();
}
// Private methods ----------------------------------------------------------------------------------------------------
+
+ private void clearClusterConnections()
+ {
+ clusterConnections.clear();
+ this.defaultClusterConnection = null;
+ }
+
private void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
{
if (config.getName() == null)
@@ -673,7 +697,6 @@
{
log.debug("ClusterConnection.start at " + clusterConnection, new Exception("trace"));
}
- clusterConnection.start();
}
private Transformer instantiateTransformer(final String transformerClassName)
@@ -748,11 +771,6 @@
broadcastGroups.put(config.getName(), group);
managementService.registerBroadcastGroup(group, config);
-
- if (!backup)
- {
- group.start();
- }
}
private void logWarnNoConnector(final String connectorName, final String bgName)
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-17 03:00:35 UTC (rev 11359)
@@ -227,7 +227,12 @@
// Used to identify the server on tests... useful on debugging testcases
private String identity;
+
+ private Thread backupActivationThread;
+ private Activation activation;
+
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -289,11 +294,6 @@
// lifecycle methods
// ----------------------------------------------------------------
- private interface Activation extends Runnable
- {
- void close(boolean permanently) throws Exception;
- }
-
/*
* Can be overridden for tests
*/
@@ -309,259 +309,6 @@
}
}
- private class NoSharedStoreLiveActivation implements Activation
- {
- public void run()
- {
- try
- {
- initialisePart1();
-
- initialisePart2();
-
- if (identity != null)
- {
- log.info("Server " + identity + " is now live");
- }
- else
- {
- log.info("Server is now live");
- }
- }
- catch (Exception e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
-
- }
- }
-
- private class SharedStoreLiveActivation implements Activation
- {
- public void run()
- {
- try
- {
- log.info("Waiting to obtain live lock");
-
- checkJournalDirectory();
-
- initialisePart1();
-
- if(nodeManager.isBackupLive())
- {
- //looks like we've failed over at some point need to inform that we are the backup so when the current live
- // goes down they failover to us
- clusterManager.announceBackup();
- Thread.sleep(configuration.getFailbackDelay());
- }
-
- nodeManager.startLiveNode();
-
- if (stopped)
- {
- return;
- }
-
- initialisePart2();
-
- log.info("Server is now live");
- }
- catch (Exception e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
- if(permanently)
- {
- nodeManager.crashLiveServer();
- }
- else
- {
- nodeManager.pauseLiveServer();
- }
- }
- }
-
-
- private class SharedStoreBackupActivation implements Activation
- {
-
- volatile boolean closed = false;
- public void run()
- {
- try
- {
- nodeManager.startBackup();
-
- initialisePart1();
-
- clusterManager.start();
-
- started = true;
-
- log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started, waiting live to fail before it gets active");
-
- nodeManager.awaitLiveNode();
-
- configuration.setBackup(false);
-
- if (stopped)
- {
- return;
- }
-
- initialisePart2();
-
- clusterManager.activate();
-
- log.info("Backup Server is now live");
-
- nodeManager.releaseBackup();
- if(configuration.isAllowAutoFailBack())
- {
- class FailbackChecker implements Runnable
- {
- boolean restarting = false;
- public void run()
- {
- try
- {
- if(!restarting && nodeManager.isAwaitingFailback())
- {
- log.info("live server wants to restart, restarting server in backup");
- restarting = true;
- Thread t = new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- log.debug(HornetQServerImpl.this + "::Stopping live node in favor of failback");
- stop(true);
- // We need to wait some time before we start the backup again
- // otherwise we may eventually start before the live had a chance to get it
- Thread.sleep(configuration.getFailbackDelay());
- configuration.setBackup(true);
- log.debug(HornetQServerImpl.this + "::Starting backup node now after failback");
- start();
- }
- catch (Exception e)
- {
- log.warn("unable to restart server, please kill and restart manually", e);
- }
- }
- });
- t.start();
- }
- }
- catch (Exception e)
- {
- log.debug(e.getMessage(), e);
- //hopefully it will work next call
- }
- }
- }
- scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l, TimeUnit.MILLISECONDS);
- }
- }
- catch (InterruptedException e)
- {
- //this is ok, we are being stopped
- }
- catch (ClosedChannelException e)
- {
- //this is ok too, we are being stopped
- }
- catch (Exception e)
- {
- if(!(e.getCause() instanceof InterruptedException))
- {
- log.error("Failure in initialisation", e);
- }
- }
- catch(Throwable e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
- if (configuration.isBackup())
- {
- long timeout = 30000;
-
- long start = System.currentTimeMillis();
-
- while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
- {
- nodeManager.interrupt();
-
- backupActivationThread.interrupt();
-
- backupActivationThread.join(1000);
-
- }
-
- if (System.currentTimeMillis() - start >= timeout)
- {
- threadDump("Timed out waiting for backup activation to exit");
- }
-
- nodeManager.stopBackup();
- }
- else
- {
- //if we are now live, behave as live
- // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
- // started before the live
- if(permanently)
- {
- nodeManager.crashLiveServer();
- }
- else
- {
- nodeManager.pauseLiveServer();
- }
- }
- }
- }
-
- private class SharedNothingBackupActivation implements Activation
- {
- public void run()
- {
- try
- {
- // TODO
-
- // Try-Connect to live server using live-connector-ref
-
- // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
- }
- catch (Exception e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
- }
- }
-
- private Thread backupActivationThread;
-
- private Activation activation;
-
public synchronized void start() throws Exception
{
stopped = false;
@@ -611,6 +358,7 @@
}
+ // The activation on fail-back may change the value of isBackup, for that reason we are not using else here
if (configuration.isBackup())
{
if (configuration.isSharedStore())
@@ -1069,7 +817,6 @@
return new HashSet<ServerSession>(sessions.values());
}
- // TODO - should this really be here?? It's only used in tests
public boolean isInitialised()
{
synchronized (initialiseLock)
@@ -1232,9 +979,145 @@
return connectorsService;
}
- // Public
- // ---------------------------------------------------------------------------------------
+
+ public synchronized boolean checkActivate() throws Exception
+ {
+ if (configuration.isBackup())
+ {
+ // Handle backup server activation
+ if (!configuration.isSharedStore())
+ {
+ if (replicationEndpoint == null)
+ {
+ HornetQServerImpl.log.warn("There is no replication endpoint, can't activate this backup server");
+
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Can't activate the server");
+ }
+
+ replicationEndpoint.stop();
+ }
+
+ // Complete the startup procedure
+
+ HornetQServerImpl.log.info("Activating backup server");
+
+ configuration.setBackup(false);
+
+ initialisePart2();
+ }
+
+ return true;
+ }
+
+ public void deployDivert(DivertConfiguration config) throws Exception
+ {
+ if (config.getName() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify a name for each divert. This one will not be deployed.");
+
+ return;
+ }
+
+ if (config.getAddress() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify an address for each divert. This one will not be deployed.");
+
+ return;
+ }
+
+ if (config.getForwardingAddress() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify an forwarding address for each divert. This one will not be deployed.");
+
+ return;
+ }
+
+ SimpleString sName = new SimpleString(config.getName());
+
+ if (postOffice.getBinding(sName) != null)
+ {
+ HornetQServerImpl.log.warn("Binding already exists with name " + sName + ", divert will not be deployed");
+
+ return;
+ }
+
+ SimpleString sAddress = new SimpleString(config.getAddress());
+
+ Transformer transformer = instantiateTransformer(config.getTransformerClassName());
+
+ Filter filter = FilterImpl.createFilter(config.getFilterString());
+
+ Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
+ sName,
+ new SimpleString(config.getRoutingName()),
+ config.isExclusive(),
+ filter,
+ transformer,
+ postOffice,
+ storageManager);
+
+ Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
+
+ postOffice.addBinding(binding);
+
+ managementService.registerDivert(divert, config);
+ }
+
+ public void destroyDivert(SimpleString name) throws Exception
+ {
+ Binding binding = postOffice.getBinding(name);
+ if (binding == null)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for divert " + name);
+ }
+ if (!(binding instanceof DivertBinding))
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding " + name + " is not a divert");
+ }
+
+ postOffice.removeBinding(name);
+ }
+
+
+
+ public void deployBridge(BridgeConfiguration config) throws Exception
+ {
+ if (clusterManager != null)
+ {
+ clusterManager.deployBridge(config);
+ }
+ }
+
+ public void destroyBridge(String name) throws Exception
+ {
+ if (clusterManager != null)
+ {
+ clusterManager.destroyBridge(name);
+ }
+ }
+
+ public ServerSession getSessionByID(String sessionName)
+ {
+ return sessions.get(sessionName);
+ }
+
+ // PUBLIC -------
+
+ public String toString()
+ {
+ if (identity != null)
+ {
+ return "HornetQServerImpl::" + identity;
+ }
+ else
+ {
+ return "HornetQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : "");
+ }
+ }
+
+
+
// Package protected
// ----------------------------------------------------------------------------
@@ -1296,34 +1179,6 @@
// Private
// --------------------------------------------------------------------------------------
- // private boolean startReplication() throws Exception
- // {
- // String backupConnectorName = configuration.getBackupConnectorName();
- //
- // if (!configuration.isSharedStore() && backupConnectorName != null)
- // {
- // TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
- //
- // if (backupConnector == null)
- // {
- // HornetQServerImpl.log.warn("connector with name '" + backupConnectorName +
- // "' is not defined in the configuration.");
- // }
- // else
- // {
- //
- // replicationFailoverManager = createBackupConnectionFailoverManager(backupConnector,
- // threadPool,
- // scheduledPool);
- //
- // replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
- // replicationManager.start();
- // }
- // }
- //
- // return true;
- // }
-
private void callActivateCallbacks()
{
for (ActivateCallback callback : activateCallbacks)
@@ -1340,44 +1195,6 @@
}
}
- public synchronized boolean checkActivate() throws Exception
- {
- if (configuration.isBackup())
- {
- // Handle backup server activation
-
- if (!configuration.isSharedStore())
- {
- if (replicationEndpoint == null)
- {
- HornetQServerImpl.log.warn("There is no replication endpoint, can't activate this backup server");
-
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Can't activate the server");
- }
-
- replicationEndpoint.stop();
- }
-
- // Complete the startup procedure
-
- HornetQServerImpl.log.info("Activating backup server");
-
- configuration.setBackup(false);
-
- initialisePart2();
- }
-
- return true;
- }
-
- private class FileActivateRunner implements Runnable
- {
- public void run()
- {
-
- }
- }
-
private void initialiseLogging()
{
LogDelegateFactory logDelegateFactory = (LogDelegateFactory)instantiateInstance(configuration.getLogDelegateFactoryClassName());
@@ -1478,8 +1295,11 @@
nodeManager.getUUID(),
configuration.isBackup(),
configuration.isClustered());
+
+ clusterManager.deploy();
+
remotingService = new RemotingServiceImpl(clusterManager, configuration, this, managementService, scheduledPool);
messagingServerControl = managementService.registerServer(postOffice,
@@ -1826,76 +1646,6 @@
}
}
- public void deployDivert(DivertConfiguration config) throws Exception
- {
- if (config.getName() == null)
- {
- HornetQServerImpl.log.warn("Must specify a name for each divert. This one will not be deployed.");
-
- return;
- }
-
- if (config.getAddress() == null)
- {
- HornetQServerImpl.log.warn("Must specify an address for each divert. This one will not be deployed.");
-
- return;
- }
-
- if (config.getForwardingAddress() == null)
- {
- HornetQServerImpl.log.warn("Must specify an forwarding address for each divert. This one will not be deployed.");
-
- return;
- }
-
- SimpleString sName = new SimpleString(config.getName());
-
- if (postOffice.getBinding(sName) != null)
- {
- HornetQServerImpl.log.warn("Binding already exists with name " + sName + ", divert will not be deployed");
-
- return;
- }
-
- SimpleString sAddress = new SimpleString(config.getAddress());
-
- Transformer transformer = instantiateTransformer(config.getTransformerClassName());
-
- Filter filter = FilterImpl.createFilter(config.getFilterString());
-
- Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
- sName,
- new SimpleString(config.getRoutingName()),
- config.isExclusive(),
- filter,
- transformer,
- postOffice,
- storageManager);
-
- Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
-
- postOffice.addBinding(binding);
-
- managementService.registerDivert(divert, config);
- }
-
- public void destroyDivert(SimpleString name) throws Exception
- {
- Binding binding = postOffice.getBinding(name);
- if (binding == null)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for divert " + name);
- }
- if (!(binding instanceof DivertBinding))
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding " + name + " is not a divert");
- }
-
- postOffice.removeBinding(name);
- }
-
-
private synchronized void deployGroupingHandlerConfiguration(final GroupingHandlerConfiguration config) throws Exception
{
if (config != null)
@@ -1922,22 +1672,6 @@
managementService.addNotificationListener(groupingHandler);
}
}
-
- public void deployBridge(BridgeConfiguration config) throws Exception
- {
- if (clusterManager != null)
- {
- clusterManager.deployBridge(config);
- }
- }
-
- public void destroyBridge(String name) throws Exception
- {
- if (clusterManager != null)
- {
- clusterManager.destroyBridge(name);
- }
- }
private Transformer instantiateTransformer(final String transformerClassName)
{
@@ -1979,11 +1713,6 @@
}
- public ServerSession getSessionByID(String sessionName)
- {
- return sessions.get(sessionName);
- }
-
/**
* Check if journal directory exists or create it (if configured to do so)
*/
@@ -2005,18 +1734,284 @@
}
}
- public String toString()
+ /**
+ * To be called by backup trying to fail back the server
+ */
+ private void startFailbackChecker()
{
- if (identity != null)
+ scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l, TimeUnit.MILLISECONDS);
+ }
+
+
+ // Inner classes
+ // --------------------------------------------------------------------------------
+
+ class FailbackChecker implements Runnable
+ {
+ boolean restarting = false;
+ public void run()
{
- return "HornetQServerImpl::" + identity;
+ try
+ {
+ if(!restarting && nodeManager.isAwaitingFailback())
+ {
+ log.info("live server wants to restart, restarting server in backup");
+ restarting = true;
+ Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ log.debug(HornetQServerImpl.this + "::Stopping live node in favor of failback");
+ stop(true);
+ // We need to wait some time before we start the backup again
+ // otherwise we may eventually start before the live had a chance to get it
+ Thread.sleep(configuration.getFailbackDelay());
+ configuration.setBackup(true);
+ log.debug(HornetQServerImpl.this + "::Starting backup node now after failback");
+ start();
+ }
+ catch (Exception e)
+ {
+ log.warn("unable to restart server, please kill and restart manually", e);
+ }
+ }
+ });
+ t.start();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
- else
+ }
+
+
+
+ private class SharedStoreLiveActivation implements Activation
+ {
+ public void run()
{
- return "HornetQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : "");
+ try
+ {
+ log.info("Waiting to obtain live lock");
+
+ checkJournalDirectory();
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("First part initialization on " + this);
+ }
+
+ initialisePart1();
+
+ if(nodeManager.isBackupLive())
+ {
+ //looks like we've failed over at some point need to inform that we are the backup so when the current live
+ // goes down they failover to us
+ if (log.isDebugEnabled())
+ {
+ log.debug("announcing backup to the former live" + this);
+ }
+
+ clusterManager.announceBackup();
+ Thread.sleep(configuration.getFailbackDelay());
+ }
+
+ nodeManager.startLiveNode();
+
+ if (stopped)
+ {
+ return;
+ }
+
+ initialisePart2();
+
+ log.info("Server is now live");
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in initialisation", e);
+ }
}
+
+ public void close(boolean permanently) throws Exception
+ {
+ if(permanently)
+ {
+ nodeManager.crashLiveServer();
+ }
+ else
+ {
+ nodeManager.pauseLiveServer();
+ }
+ }
}
- // Inner classes
- // --------------------------------------------------------------------------------
+
+ private class SharedStoreBackupActivation implements Activation
+ {
+ public void run()
+ {
+ try
+ {
+ nodeManager.startBackup();
+
+ initialisePart1();
+
+ clusterManager.start();
+
+ started = true;
+
+ log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started, waiting live to fail before it gets active");
+
+ nodeManager.awaitLiveNode();
+
+ configuration.setBackup(false);
+
+ if (stopped)
+ {
+ return;
+ }
+
+ initialisePart2();
+
+ clusterManager.activate();
+
+ log.info("Backup Server is now live");
+
+ nodeManager.releaseBackup();
+ if(configuration.isAllowAutoFailBack())
+ {
+ startFailbackChecker();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ //this is ok, we are being stopped
+ }
+ catch (ClosedChannelException e)
+ {
+ //this is ok too, we are being stopped
+ }
+ catch (Exception e)
+ {
+ if(!(e.getCause() instanceof InterruptedException))
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+ catch(Throwable e)
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+
+ /**
+ *
+ */
+ public void close(boolean permanently) throws Exception
+ {
+ if (configuration.isBackup())
+ {
+ long timeout = 30000;
+
+ long start = System.currentTimeMillis();
+
+ while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
+ {
+ nodeManager.interrupt();
+
+ backupActivationThread.interrupt();
+
+ backupActivationThread.join(1000);
+
+ }
+
+ if (System.currentTimeMillis() - start >= timeout)
+ {
+ threadDump("Timed out waiting for backup activation to exit");
+ }
+
+ nodeManager.stopBackup();
+ }
+ else
+ {
+ //if we are now live, behave as live
+ // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
+ // started before the live
+ if(permanently)
+ {
+ nodeManager.crashLiveServer();
+ }
+ else
+ {
+ nodeManager.pauseLiveServer();
+ }
+ }
+ }
+ }
+
+ private interface Activation extends Runnable
+ {
+ void close(boolean permanently) throws Exception;
+ }
+
+ private class SharedNothingBackupActivation implements Activation
+ {
+ public void run()
+ {
+ try
+ {
+ // TODO
+
+ // Try-Connect to live server using live-connector-ref
+
+ // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+
+ public void close(boolean permanently) throws Exception
+ {
+ }
+ }
+
+ private class NoSharedStoreLiveActivation implements Activation
+ {
+ public void run()
+ {
+ try
+ {
+ initialisePart1();
+
+ initialisePart2();
+
+ if (identity != null)
+ {
+ log.info("Server " + identity + " is now live");
+ }
+ else
+ {
+ log.info("Server is now live");
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+
+ public void close(boolean permanently) throws Exception
+ {
+
+ }
+ }
+
+
}
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-17 03:00:35 UTC (rev 11359)
@@ -1062,7 +1062,7 @@
return;
}
-
+
consumer.receiveCredits(credits);
}
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-09-17 03:00:35 UTC (rev 11359)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -76,18 +77,28 @@
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
+ ((ServerLocatorInternal)locator).setIdentity("testAutoFailback");
+
ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
final CountDownLatch latch = new CountDownLatch(1);
ClientSession session = sendAndConsume(sf, true);
+
+ System.out.println(locator.getTopology().describe());
MyListener listener = new MyListener(latch);
session.addFailureListener(listener);
+
+ System.out.println(locator.getTopology().describe());
liveServer.crash();
-
+
assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ log.info("backup (nowLive) topology = " + backupServer.getServer().getClusterManager().getDefaultConnection().getTopology().describe());
+
+ log.info("Server Crash!!!");
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -97,6 +108,11 @@
producer.send(message);
+ verifyMessageOnServer(1, 1);
+
+ System.out.println(locator.getTopology().describe());
+
+
session.removeFailureListener(listener);
final CountDownLatch latch2 = new CountDownLatch(1);
@@ -107,6 +123,10 @@
log.info("******* starting live server back");
liveServer.start();
+
+ Thread.sleep(1000);
+
+ System.out.println("After failback: " + locator.getTopology().describe());
assertTrue(latch2.await(5, TimeUnit.SECONDS));
@@ -118,6 +138,8 @@
session.close();
+ verifyMessageOnServer(0, 1);
+
sf.close();
Assert.assertEquals(0, sf.numSessions());
@@ -125,6 +147,29 @@
Assert.assertEquals(0, sf.numConnections());
}
+ /**
+ * @throws Exception
+ * @throws HornetQException
+ */
+ private void verifyMessageOnServer(final int server, final int numberOfMessages) throws Exception, HornetQException
+ {
+ ServerLocator backupLocator = createInVMLocator(server);
+ ClientSessionFactory factorybkp = backupLocator.createSessionFactory();
+ ClientSession sessionbkp = factorybkp.createSession(false, false);
+ sessionbkp.start();
+ ClientConsumer consumerbkp = sessionbkp.createConsumer(ADDRESS);
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumerbkp.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ sessionbkp.commit();
+ }
+ sessionbkp.close();
+ factorybkp.close();
+ backupLocator.close();
+ }
+
public void testAutoFailbackThenFailover() throws Exception
{
locator.setBlockOnNonDurableSend(true);
@@ -253,7 +298,7 @@
if (createQueue)
{
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
}
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -288,6 +333,8 @@
}
ClientMessage message3 = consumer.receiveImmediate();
+
+ consumer.close();
Assert.assertNull(message3);
@@ -315,6 +362,7 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
+ System.out.println("Failed, me");
latch.countDown();
}
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-17 02:31:55 UTC (rev 11358)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-17 03:00:35 UTC (rev 11359)
@@ -38,6 +38,7 @@
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
@@ -530,7 +531,19 @@
locators.add(locatorWithoutHA);
return locatorWithoutHA;
}
+
+ protected ServerLocator createInVMLocator(final int serverID)
+ {
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ if (serverID != 0)
+ {
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, serverID);
+ }
+
+ return HornetQClient.createServerLocatorWithHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params));
+ }
+
protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws Exception
{
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(connectorClass));
13 years, 3 months
JBoss hornetq SVN: r11358 - in branches/Branch_2_2_EAP_cluster4: src/main/org/hornetq/core/client/impl and 16 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-16 22:31:55 -0400 (Fri, 16 Sep 2011)
New Revision: 11358
Modified:
branches/Branch_2_2_EAP_cluster4/hornetq.ipr
branches/Branch_2_2_EAP_cluster4/hornetq.iws
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/remoting/Acceptor.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
Cluster cleanup
Modified: branches/Branch_2_2_EAP_cluster4/hornetq.ipr
===================================================================
--- branches/Branch_2_2_EAP_cluster4/hornetq.ipr 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/hornetq.ipr 2011-09-17 02:31:55 UTC (rev 11358)
@@ -1,5 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
+ <component name="ASMPluginConfiguration">
+ <asm skipDebug="false" skipFrames="false" skipCode="false" expandFrames="false" />
+ <groovy codeStyle="LEGACY" />
+ </component>
<component name="AntConfiguration">
<defaultAnt bundledAnt="true" />
<buildFile url="file://$PROJECT_DIR$/build-hornetq.xml">
@@ -246,14 +250,6 @@
<maximumStackSize value="32" />
<properties />
</buildFile>
- <buildFile url="file://$PROJECT_DIR$/examples/soak/normal/build.xml">
- <additionalClassPath />
- <antReference projectDefault="true" />
- <customJdkName value="" />
- <maximumHeapSize value="128" />
- <maximumStackSize value="32" />
- <properties />
- </buildFile>
</component>
<component name="BuildJarProjectSettings">
<option name="BUILD_JARS_ON_MAKE" value="false" />
@@ -273,6 +269,7 @@
<option name="SMART_TABS" value="false" />
<option name="LABEL_INDENT_SIZE" value="0" />
<option name="LABEL_INDENT_ABSOLUTE" value="false" />
+ <option name="USE_RELATIVE_INDENTS" value="false" />
</ADDITIONAL_INDENT_OPTIONS>
<ADDITIONAL_INDENT_OPTIONS fileType="jsp">
<option name="INDENT_SIZE" value="4" />
@@ -282,6 +279,7 @@
<option name="SMART_TABS" value="false" />
<option name="LABEL_INDENT_SIZE" value="0" />
<option name="LABEL_INDENT_ABSOLUTE" value="false" />
+ <option name="USE_RELATIVE_INDENTS" value="false" />
</ADDITIONAL_INDENT_OPTIONS>
<ADDITIONAL_INDENT_OPTIONS fileType="xml">
<option name="INDENT_SIZE" value="4" />
@@ -291,6 +289,7 @@
<option name="SMART_TABS" value="false" />
<option name="LABEL_INDENT_SIZE" value="0" />
<option name="LABEL_INDENT_ABSOLUTE" value="false" />
+ <option name="USE_RELATIVE_INDENTS" value="false" />
</ADDITIONAL_INDENT_OPTIONS>
</value>
</option>
@@ -326,7 +325,7 @@
<element module="All" copyright="new" />
</module2copyright>
</component>
- <component name="CppTools.Loader" reportImplicitCastToBool="false" warnedAboutFileOutOfSourceRoot="true" version="1" currentProject="$PROJECT_DIR$/native/Makefile" />
+ <component name="CppTools.Loader" reportImplicitCastToBool="false" reportNameReferencedOnce="false" warnedAboutFileOutOfSourceRoot="true" version="3" currentProject="$PROJECT_DIR$/native/Makefile" compilerSelect="AUTO" />
<component name="DependenciesAnalyzeManager">
<option name="myForwardDirection" value="false" />
</component>
@@ -365,22 +364,22 @@
</facet-type>
<facet-type id="javaeeApplication">
<modules>
- <module name="messaging" />
<module name="hornetq-javaee-examples">
<files>
<file url="file://$PROJECT_DIR$/examples/javaee/servlet-transport/config/META-INF/application.xml" />
</files>
</module>
+ <module name="messaging" />
</modules>
</facet-type>
<facet-type id="web">
<modules>
- <module name="messaging" />
<module name="hornetq-javaee-examples">
<files>
<file url="file://$PROJECT_DIR$/examples/javaee/servlet-transport/config/WEB-INF/web.xml" />
</files>
</module>
+ <module name="messaging" />
</modules>
</facet-type>
</autodetection-disabled>
@@ -631,6 +630,9 @@
<module fileurl="file://$PROJECT_DIR$/tests/hornetq-tests.iml" filepath="$PROJECT_DIR$/tests/hornetq-tests.iml" />
</modules>
</component>
+ <component name="ProjectResources">
+ <default-html-doctype>http://www.w3.org/1999/xhtml</default-html-doctype>
+ </component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_5" assert-keyword="true" jdk-15="true" project-jdk-name="1.6" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/classes" />
</component>
@@ -701,87 +703,6 @@
</option>
</component>
<component name="libraryTable">
- <library name="messaging">
- <CLASSES>
- <root url="file://$PROJECT_DIR$/src/config/common" />
- <root url="file://$PROJECT_DIR$/src/config/stand-alone/non-clustered" />
- </CLASSES>
- <JAVADOC />
- <SOURCES />
- <jarDirectory url="file://$PROJECT_DIR$/src/etc" recursive="false" />
- <jarDirectory url="file://$PROJECT_DIR$/tests/etc" recursive="false" />
- <jarDirectory url="file://$PROJECT_DIR$/tests/jms-tests/etc" recursive="false" />
- </library>
- <library name="messaging_jars">
- <CLASSES>
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/lib/jboss-mdr.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/microcontainer/lib/jboss-kernel.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/sun-jaxb/lib/jaxb-api.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/integration/lib/jboss-transaction-spi.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/jboss/jbossts/lib/jbossjts.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/javaee/lib/jboss-transaction-api.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/javaee/lib/jboss-jca-api.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/microcontainer/lib/jboss-dependency.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/lib/jbossxb.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/net/java/dev/javacc/lib/javacc.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/security/lib/jbosssx.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/apache-xerces/lib/xml-apis.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/lib/jboss-reflect.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/apache-xerces/lib/xercesImpl.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/wutka-dtdparser/lib/dtdparser121.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/javaee/lib/jboss-jms-api.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/javax/activation/lib/activation.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/security/lib/jboss-security-spi.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/lib/jboss-common-core.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/log4j/lib/log4j.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/netty/lib/netty.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/logging/lib/jboss-logging-spi.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/naming/lib/jnpserver.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/junit/lib/junit.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/javaee/lib/jboss-jaspi-api.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/jboss/jbossts/lib/jbossts-common.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/apache-logging/lib/commons-logging.jar!/" />
- <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/ejb3/lib/jboss-ejb3-ext-api.jar!/" />
- </CLASSES>
- <JAVADOC />
- <SOURCES />
- </library>
- <library name="messaging-tests">
- <CLASSES>
- <root url="file://$PROJECT_DIR$/tests/config" />
- </CLASSES>
- <JAVADOC />
- <SOURCES />
- </library>
- <library name="messaging-joram-tests">
- <CLASSES>
- <root url="file://$PROJECT_DIR$/tests/joram-tests/config" />
- </CLASSES>
- <JAVADOC />
- <SOURCES />
- </library>
- <library name="messaging-jms-tests">
- <CLASSES>
- <root url="file://$PROJECT_DIR$/tests/jms-tests/config" />
- </CLASSES>
- <JAVADOC />
- <SOURCES />
- </library>
- <library name="messaging-jms-examples">
- <CLASSES>
- <root url="file://$PROJECT_DIR$/examples/jms/common/config" />
- <root url="file://$PROJECT_DIR$/src/config/common" />
- </CLASSES>
- <JAVADOC />
- <SOURCES />
- </library>
- <library name="messaging-javaee-examples">
- <CLASSES>
- <root url="file://$PROJECT_DIR$/src/config/common" />
- </CLASSES>
- <JAVADOC />
- <SOURCES />
- </library>
<library name="ant 1.7.1">
<CLASSES>
<root url="jar://$PROJECT_DIR$/../../devtools/apache-ant-1.7.1/lib/ant-contrib/lib/commons-logging-1.0.4.jar!/" />
@@ -828,6 +749,87 @@
<JAVADOC />
<SOURCES />
</library>
+ <library name="messaging">
+ <CLASSES>
+ <root url="file://$PROJECT_DIR$/src/config/common" />
+ <root url="file://$PROJECT_DIR$/src/config/stand-alone/non-clustered" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ <jarDirectory url="file://$PROJECT_DIR$/src/etc" recursive="false" />
+ <jarDirectory url="file://$PROJECT_DIR$/tests/etc" recursive="false" />
+ <jarDirectory url="file://$PROJECT_DIR$/tests/jms-tests/etc" recursive="false" />
+ </library>
+ <library name="messaging-javaee-examples">
+ <CLASSES>
+ <root url="file://$PROJECT_DIR$/src/config/common" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ <library name="messaging-jms-examples">
+ <CLASSES>
+ <root url="file://$PROJECT_DIR$/examples/jms/common/config" />
+ <root url="file://$PROJECT_DIR$/src/config/common" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ <library name="messaging-jms-tests">
+ <CLASSES>
+ <root url="file://$PROJECT_DIR$/tests/jms-tests/config" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ <library name="messaging-joram-tests">
+ <CLASSES>
+ <root url="file://$PROJECT_DIR$/tests/joram-tests/config" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ <library name="messaging-tests">
+ <CLASSES>
+ <root url="file://$PROJECT_DIR$/tests/config" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ <library name="messaging_jars">
+ <CLASSES>
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/lib/jboss-mdr.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/microcontainer/lib/jboss-kernel.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/sun-jaxb/lib/jaxb-api.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/integration/lib/jboss-transaction-spi.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/jboss/jbossts/lib/jbossjts.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/javaee/lib/jboss-transaction-api.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/javaee/lib/jboss-jca-api.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/microcontainer/lib/jboss-dependency.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/lib/jbossxb.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/net/java/dev/javacc/lib/javacc.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/security/lib/jbosssx.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/apache-xerces/lib/xml-apis.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/lib/jboss-reflect.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/apache-xerces/lib/xercesImpl.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/wutka-dtdparser/lib/dtdparser121.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/javaee/lib/jboss-jms-api.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/javax/activation/lib/activation.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/security/lib/jboss-security-spi.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/lib/jboss-common-core.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/log4j/lib/log4j.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/netty/lib/netty.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/logging/lib/jboss-logging-spi.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/naming/lib/jnpserver.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/junit/lib/junit.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/javaee/lib/jboss-jaspi-api.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/jboss/jbossts/lib/jbossts-common.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/apache-logging/lib/commons-logging.jar!/" />
+ <root url="jar://$PROJECT_DIR$/thirdparty/org/jboss/ejb3/lib/jboss-ejb3-ext-api.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
</component>
</project>
Modified: branches/Branch_2_2_EAP_cluster4/hornetq.iws
===================================================================
--- branches/Branch_2_2_EAP_cluster4/hornetq.iws 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/hornetq.iws 2011-09-17 02:31:55 UTC (rev 11358)
@@ -2,46 +2,31 @@
<project version="4">
<component name="ChangeListManager">
<list default="true" readonly="true" id="a2aae645-dbcd-4d6e-9c99-efa05d93589a" name="Default" comment="">
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/server/ra.xml" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/src/org/hornetq/javaee/example/server" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/server0/hornetq-configuration.xml" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/readme.html" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/src/org/hornetq/javaee/example/server/MDBQueue.java" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/server0/hornetq-jms.xml" />
- <change type="MODIFICATION" beforePath="$PROJECT_DIR$/docs/user-manual/en/appserver-integration.xml" afterPath="$PROJECT_DIR$/docs/user-manual/en/appserver-integration.xml" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/config/META-INF/application.xml" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/build.sh" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/src/org/hornetq/javaee/example/MDBRemoteServerClientExample.java" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/src/org/hornetq/javaee" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/build.xml" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/server" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/src" />
- <change type="MODIFICATION" beforePath="$PROJECT_DIR$/examples/javaee/common/build.xml" afterPath="$PROJECT_DIR$/examples/javaee/common/build.xml" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/config/ant.properties" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/src/org" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/build.bat" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/server0/hornetq-users.xml" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/server/hornetq-configuration.xml" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/server/hornetq-jms.xml" />
- <change type="MODIFICATION" beforePath="$PROJECT_DIR$/docs/user-manual/en/examples.xml" afterPath="$PROJECT_DIR$/docs/user-manual/en/examples.xml" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/server/jms-ds.xml" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/config/META-INF" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/server0/client-jndi.properties" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/src/org/hornetq/javaee/example" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/src/org/hornetq" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/server0" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/server0/hornetq-beans.xml" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/config/client.jndi.properties" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/javaee/jca-remote/config" />
- </list>
- <list id="5341122e-b51c-4e90-b798-7086790ef7e8" name="intellij" comment="">
- <change type="MODIFICATION" beforePath="$PROJECT_DIR$/hornetq.iml" afterPath="$PROJECT_DIR$/hornetq.iml" />
- <change type="MODIFICATION" beforePath="$PROJECT_DIR$/examples/javaee/hornetq-javaee-examples.iml" afterPath="$PROJECT_DIR$/examples/javaee/hornetq-javaee-examples.iml" />
- <change type="MODIFICATION" beforePath="$PROJECT_DIR$/hornetq.iws" afterPath="$PROJECT_DIR$/hornetq.iws" />
- <change type="MODIFICATION" beforePath="$PROJECT_DIR$/examples/jms/hornetq-jms-examples.iml" afterPath="$PROJECT_DIR$/examples/jms/hornetq-jms-examples.iml" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/hornetq.ipr" afterPath="$PROJECT_DIR$/hornetq.ipr" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/hornetq.iws" afterPath="$PROJECT_DIR$/hornetq.iws" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/ClusterConnection.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/ClusterConnection.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/ClusterManager.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/ClusterManager.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/spi/core/remoting/Acceptor.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/spi/core/remoting/Acceptor.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java" afterPath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java" afterPath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java" afterPath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java" afterPath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java" afterPath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java" afterPath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/tests/src/org/hornetq/tests/util/ServiceTestBase.java" afterPath="$PROJECT_DIR$/tests/src/org/hornetq/tests/util/ServiceTestBase.java" />
</list>
+ <list id="5341122e-b51c-4e90-b798-7086790ef7e8" name="intellij" comment="" />
<ignored path=".idea/workspace.xml" />
<ignored path="messaging.iws" />
<option name="TRACKING_ENABLED" value="true" />
@@ -73,7 +58,7 @@
<option name="CONDITION" value="" />
<option name="LOG_MESSAGE" value="" />
</breakpoint>
- <breakpoint url="file://$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java" line="847" class="org.hornetq.core.server.cluster.impl.ClusterConnectionImpl.MessageFlowRecordImpl" package="org.hornetq.core.server.cluster.impl">
+ <breakpoint url="file://$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java" line="835" class="org.hornetq.core.server.cluster.impl.ClusterConnectionImpl" package="org.hornetq.core.server.cluster.impl">
<option name="ENABLED" value="true" />
<option name="LOG_ENABLED" value="false" />
<option name="LOG_EXPRESSION_ENABLED" value="false" />
@@ -138,7 +123,7 @@
<option name="CONDITION" value="" />
<option name="LOG_MESSAGE" value="" />
</breakpoint>
- <breakpoint url="file://$PROJECT_DIR$/src/main/org/hornetq/integration/jboss/security/JBossASSecurityManager.java" line="145" class="org.hornetq.integration.jboss.security.JBossASSecurityManager" package="org.hornetq.integration.jboss.security">
+ <breakpoint url="file://$PROJECT_DIR$/src/main/org/hornetq/integration/jboss/security/JBossASSecurityManager.java" line="145" class="org.hornetq.integration.jboss.security.JBossASSecurityManager$1" package="org.hornetq.integration.jboss.security">
<option name="ENABLED" value="true" />
<option name="LOG_ENABLED" value="false" />
<option name="LOG_EXPRESSION_ENABLED" value="false" />
@@ -242,7 +227,7 @@
<option name="CONDITION" value="" />
<option name="LOG_MESSAGE" value="" />
</breakpoint>
- <breakpoint url="file://$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java" line="234" class="org.hornetq.tests.integration.ra.HornetQMessageHandlerTest.DummyMessageEndpoint" package="org.hornetq.tests.integration.ra">
+ <breakpoint url="file://$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java" line="234" class="org.hornetq.tests.integration.ra.HornetQMessageHandlerTest" package="org.hornetq.tests.integration.ra">
<option name="ENABLED" value="true" />
<option name="LOG_ENABLED" value="false" />
<option name="LOG_EXPRESSION_ENABLED" value="false" />
@@ -320,72 +305,67 @@
<component name="FavoritesManager">
<favorites_list name="messaging" />
</component>
- <component name="FileColors" enabled="true" enabledForTabs="true" />
<component name="FileEditorManager">
<leaf>
- <file leaf-file-name="MDBRemoteServerClientExample.java" pinned="false" current="false" current-in-tab="false">
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/src/org/hornetq/javaee/example/MDBRemoteServerClientExample.java">
+ <file leaf-file-name="ClusterConnectionImpl.java" pinned="false" current="true" current-in-tab="true">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="95" column="17" selection-start="3054" selection-end="3226" vertical-scroll-proportion="0.0">
- <folding>
- <element signature="imports" expanded="true" />
- </folding>
+ <state line="368" column="21" selection-start="11960" selection-end="11960" vertical-scroll-proportion="0.35629454">
+ <folding />
</state>
</provider>
</entry>
</file>
- <file leaf-file-name="readme.html" pinned="false" current="false" current-in-tab="false">
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/readme.html">
+ <file leaf-file-name="ClusterManagerImpl.java" pinned="false" current="false" current-in-tab="false">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="46" column="44" selection-start="2573" selection-end="3415" vertical-scroll-proportion="-16.304348">
+ <state line="592" column="40" selection-start="18885" selection-end="18908" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
- <file leaf-file-name="appserver-integration.xml" pinned="false" current="false" current-in-tab="false">
- <entry file="file://$PROJECT_DIR$/docs/user-manual/en/appserver-integration.xml">
+ <file leaf-file-name="HornetQComponent.java" pinned="false" current="false" current-in-tab="false">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/server/HornetQComponent.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="836" column="101" selection-start="48141" selection-end="48141" vertical-scroll-proportion="-15.04">
+ <state line="28" column="8" selection-start="929" selection-end="929" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
- <file leaf-file-name="examples.xml" pinned="false" current="true" current-in-tab="true">
- <entry file="file://$PROJECT_DIR$/docs/user-manual/en/examples.xml">
+ <file leaf-file-name="PagingStoreImpl.java" pinned="false" current="false" current-in-tab="false">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="534" column="124" selection-start="33920" selection-end="33920" vertical-scroll-proportion="0.4304762">
+ <state line="65" column="13" selection-start="2716" selection-end="2716" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
- <file leaf-file-name="ra.xml" pinned="false" current="false" current-in-tab="false">
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/server/ra.xml">
+ <file leaf-file-name="PageCursorProviderImpl.java" pinned="false" current="false" current-in-tab="false">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="61" column="27" selection-start="2184" selection-end="3025" vertical-scroll-proportion="0.0">
+ <state line="46" column="13" selection-start="1822" selection-end="1822" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
- <file leaf-file-name="hornetq-configuration.xml" pinned="false" current="false" current-in-tab="false">
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/server0/hornetq-configuration.xml">
+ <file leaf-file-name="PageSubscription.java" pinned="false" current="false" current-in-tab="false">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/paging/cursor/PageSubscription.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="46" column="49" selection-start="1976" selection-end="1976" vertical-scroll-proportion="0.0">
+ <state line="38" column="0" selection-start="1213" selection-end="1213" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
- <file leaf-file-name="MDBQueue.java" pinned="false" current="false" current-in-tab="false">
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/src/org/hornetq/javaee/example/server/MDBQueue.java">
+ <file leaf-file-name="PageSubscriptionImpl.java" pinned="false" current="false" current-in-tab="false">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="46" column="25" selection-start="1849" selection-end="1865" vertical-scroll-proportion="0.0">
- <folding>
- <element signature="imports" expanded="true" />
- </folding>
+ <state line="704" column="12" selection-start="19472" selection-end="19472" vertical-scroll-proportion="0.0">
+ <folding />
</state>
</provider>
</entry>
@@ -427,8 +407,6 @@
<component name="IdeDocumentHistory">
<option name="changedFiles">
<list>
- <option value="$PROJECT_DIR$/examples/javaee/jca-config/build.xml" />
- <option value="$PROJECT_DIR$/examples/javaee/jca-remote/config/jndi.properties" />
<option value="$PROJECT_DIR$/examples/javaee/jca-remote/server0/hornetq-beans.xml" />
<option value="$PROJECT_DIR$/examples/javaee/jca-remote/server/hornetq-configuration.xml" />
<option value="$PROJECT_DIR$/examples/javaee/jca-remote/server0/hornetq-jms.xml" />
@@ -443,6 +421,8 @@
<option value="$PROJECT_DIR$/examples/javaee/jca-remote/readme.html" />
<option value="$PROJECT_DIR$/docs/user-manual/en/appserver-integration.xml" />
<option value="$PROJECT_DIR$/docs/user-manual/en/examples.xml" />
+ <option value="$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java" />
+ <option value="$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java" />
</list>
</option>
</component>
@@ -455,7 +435,7 @@
<profile-state />
</entry>
</component>
- <component name="ProjectLevelVcsManager">
+ <component name="ProjectLevelVcsManager" settingsEditedManually="false">
<OptionsSetting value="true" id="Add" />
<OptionsSetting value="true" id="Remove" />
<OptionsSetting value="true" id="Checkout" />
@@ -1367,7 +1347,6 @@
<sortByType />
</navigator>
<panes>
- <pane id="PackagesPane" />
<pane id="ProjectPane">
<subPane>
<PATH>
@@ -1382,7 +1361,7 @@
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="javaee" />
+ <option name="myItemId" value="trunk" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
</PATH>
@@ -1392,11 +1371,11 @@
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="javaee" />
+ <option name="myItemId" value="trunk" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="mdb-bmt" />
+ <option name="myItemId" value="src" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
</PATH>
@@ -1406,19 +1385,15 @@
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="javaee" />
+ <option name="myItemId" value="trunk" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="mdb-bmt" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
<option name="myItemId" value="src" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="example" />
+ <option name="myItemId" value="main" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
</PATH>
@@ -1428,35 +1403,21 @@
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="javaee" />
+ <option name="myItemId" value="trunk" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="jca-remote" />
+ <option name="myItemId" value="src" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
- </PATH>
- <PATH>
<PATH_ELEMENT>
- <option name="myItemId" value="hornetq" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="javaee" />
+ <option name="myItemId" value="main" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="jca-remote" />
+ <option name="myItemId" value="hornetq" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="src" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="example" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
- </PATH_ELEMENT>
</PATH>
<PATH>
<PATH_ELEMENT>
@@ -1464,23 +1425,23 @@
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="javaee" />
+ <option name="myItemId" value="trunk" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="jca-remote" />
+ <option name="myItemId" value="src" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="src" />
+ <option name="myItemId" value="main" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="example" />
+ <option name="myItemId" value="hornetq" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="server" />
+ <option name="myItemId" value="core" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
</PATH>
@@ -1490,29 +1451,23 @@
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="javaee" />
+ <option name="myItemId" value="trunk" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="jca-remote" />
+ <option name="myItemId" value="src" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="server0" />
+ <option name="myItemId" value="main" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
- </PATH>
- <PATH>
<PATH_ELEMENT>
<option name="myItemId" value="hornetq" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="javaee" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="jca-remote" />
+ <option name="myItemId" value="core" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
@@ -1526,43 +1481,31 @@
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="javaee" />
+ <option name="myItemId" value="trunk" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="jca-remote" />
+ <option name="myItemId" value="src" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="config" />
+ <option name="myItemId" value="main" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
- </PATH>
- <PATH>
<PATH_ELEMENT>
<option name="myItemId" value="hornetq" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="javaee" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="jca-config" />
+ <option name="myItemId" value="core" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
- </PATH>
- <PATH>
<PATH_ELEMENT>
- <option name="myItemId" value="hornetq" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="javaee" />
+ <option name="myItemId" value="server" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="common" />
+ <option name="myItemId" value="cluster" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
</PATH>
@@ -1572,114 +1515,41 @@
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="hornetq-jms-examples" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewModuleNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="jms" />
+ <option name="myItemId" value="trunk" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
- </PATH>
- <PATH>
<PATH_ELEMENT>
- <option name="myItemId" value="hornetq" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="hornetq-jms-examples" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewModuleNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="jms" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="queue" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
- </PATH_ELEMENT>
- </PATH>
- <PATH>
- <PATH_ELEMENT>
- <option name="myItemId" value="hornetq" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="hornetq-jms-examples" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewModuleNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="jms" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="queue" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
<option name="myItemId" value="src" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="example" />
+ <option name="myItemId" value="main" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
- </PATH>
- <PATH>
<PATH_ELEMENT>
<option name="myItemId" value="hornetq" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="hornetq-jms-examples" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewModuleNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="common" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
- </PATH>
- <PATH>
<PATH_ELEMENT>
- <option name="myItemId" value="hornetq" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="hornetq-jms-examples" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewModuleNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="common" />
+ <option name="myItemId" value="core" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="src" />
+ <option name="myItemId" value="server" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
<PATH_ELEMENT>
- <option name="myItemId" value="example" />
+ <option name="myItemId" value="cluster" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
- </PATH>
- <PATH>
<PATH_ELEMENT>
- <option name="myItemId" value="hornetq" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewProjectNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="hornetq-jms-examples" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.ProjectViewModuleNode" />
- </PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="common" />
+ <option name="myItemId" value="impl" />
<option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
</PATH_ELEMENT>
- <PATH_ELEMENT>
- <option name="myItemId" value="config" />
- <option name="myItemType" value="com.intellij.ide.projectView.impl.nodes.PsiDirectoryNode" />
- </PATH_ELEMENT>
</PATH>
</subPane>
</pane>
+ <pane id="PackagesPane" />
<pane id="Scope" />
<pane id="Favorites" />
</panes>
@@ -1765,6 +1635,7 @@
<value defaultName="moduleWithDependencies" />
</option>
<envs />
+ <patterns />
<RunnerSettings RunnerId="Run" />
<ConfigurationWrapper RunnerId="Run" />
<method />
@@ -1814,6 +1685,7 @@
<value defaultName="moduleWithDependencies" />
</option>
<envs />
+ <patterns />
<RunnerSettings RunnerId="Debug">
<option name="DEBUG_PORT" value="50830" />
<option name="TRANSPORT" value="0" />
@@ -1847,6 +1719,7 @@
<value defaultName="moduleWithDependencies" />
</option>
<envs />
+ <patterns />
<RunnerSettings RunnerId="Run" />
<ConfigurationWrapper RunnerId="Run" />
<method />
@@ -1874,17 +1747,11 @@
<value defaultName="moduleWithDependencies" />
</option>
<envs />
+ <patterns />
<RunnerSettings RunnerId="Run" />
<ConfigurationWrapper RunnerId="Run" />
<method />
</configuration>
- <configuration default="true" type="PHPUnitRunConfigurationType" factoryName="PHPUnit">
- <method>
- <option name="AntTarget" enabled="false" />
- <option name="BuildArtifacts" enabled="false" />
- <option name="Maven.BeforeRunTask" enabled="false" />
- </method>
- </configuration>
<configuration default="true" type="Remote" factoryName="Remote">
<option name="USE_SOCKET_TRANSPORT" value="true" />
<option name="SERVER_MODE" value="false" />
@@ -1953,6 +1820,7 @@
<value defaultName="moduleWithDependencies" />
</option>
<envs />
+ <patterns />
<method>
<option name="AntTarget" enabled="false" />
<option name="BuildArtifacts" enabled="false" />
@@ -1983,6 +1851,7 @@
<value defaultName="moduleWithDependencies" />
</option>
<envs />
+ <patterns />
<RunnerSettings RunnerId="Debug">
<option name="DEBUG_PORT" value="52003" />
<option name="TRANSPORT" value="0" />
@@ -2016,6 +1885,7 @@
<value defaultName="moduleWithDependencies" />
</option>
<envs />
+ <patterns />
<RunnerSettings RunnerId="Run" />
<ConfigurationWrapper RunnerId="Run" />
<method />
@@ -2043,6 +1913,7 @@
<value defaultName="moduleWithDependencies" />
</option>
<envs />
+ <patterns />
<RunnerSettings RunnerId="Debug">
<option name="DEBUG_PORT" value="59963" />
<option name="TRANSPORT" value="0" />
@@ -2064,13 +1935,7 @@
<option name="TRANSPORT" value="0" />
<option name="LOCAL" value="false" />
</RunnerSettings>
- <RunnerSettings RunnerId="Debug">
- <option name="DEBUG_PORT" value="5005" />
- <option name="TRANSPORT" value="0" />
- <option name="LOCAL" value="false" />
- </RunnerSettings>
<ConfigurationWrapper RunnerId="Debug" />
- <ConfigurationWrapper RunnerId="Debug" />
<method />
</configuration>
<configuration default="false" name="EnqueueDurable" type="Application" factoryName="Application">
@@ -2117,14 +1982,15 @@
<option name="USER" value="" />
<option name="PASSWORD" value="" />
<option name="LAST_MERGED_REVISION" />
- <option name="UPDATE_RUN_STATUS" value="false" />
<option name="MERGE_DRY_RUN" value="false" />
<option name="MERGE_DIFF_USE_ANCESTRY" value="true" />
<option name="UPDATE_LOCK_ON_DEMAND" value="false" />
<option name="IGNORE_SPACES_IN_MERGE" value="false" />
<option name="DETECT_NESTED_COPIES" value="false" />
+ <option name="CHECK_NESTED_FOR_QUICK_MERGE" value="false" />
<option name="IGNORE_SPACES_IN_ANNOTATE" value="true" />
<option name="SHOW_MERGE_SOURCES_IN_ANNOTATE" value="true" />
+ <option name="FORCE_UPDATE" value="false" />
<configuration useDefault="false">$PROJECT_DIR$/../../.subversion</configuration>
<myIsUseDefaultProxy>false</myIsUseDefaultProxy>
<supportedVersion>125</supportedVersion>
@@ -2183,26 +2049,27 @@
</todo-panel>
</component>
<component name="ToolWindowManager">
- <frame x="-3" y="25" width="1926" height="1033" extended-state="6" />
- <editor active="false" />
+ <frame x="0" y="22" width="1680" height="1024" extended-state="0" />
+ <editor active="true" />
<layout>
- <window_info id="Changes" active="true" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" weight="0.34723788" sideWeight="0.0" order="7" side_tool="false" content_ui="tabs" />
+ <window_info id="Changes" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.34623894" sideWeight="0.0" order="7" side_tool="false" content_ui="tabs" />
<window_info id="Palette" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="3" side_tool="false" content_ui="tabs" />
<window_info id="Ant Build" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.25" sideWeight="0.5" order="1" side_tool="false" content_ui="tabs" />
- <window_info id="Find" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.48703495" sideWeight="0.5" order="1" side_tool="false" content_ui="tabs" />
<window_info id="Debug" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.17925592" sideWeight="0.0" order="3" side_tool="false" content_ui="tabs" />
+ <window_info id="ASM" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="3" side_tool="false" content_ui="tabs" />
<window_info id="Version Control" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.17587373" sideWeight="0.0" order="7" side_tool="false" content_ui="tabs" />
- <window_info id="Messages" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.16344294" sideWeight="0.5" order="7" side_tool="false" content_ui="tabs" />
<window_info id="TODO" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="6" side_tool="false" content_ui="tabs" />
- <window_info id="Structure" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.24959914" sideWeight="0.7006937" order="1" side_tool="false" content_ui="tabs" />
+ <window_info id="Structure" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.24954017" sideWeight="0.5769231" order="1" side_tool="false" content_ui="tabs" />
<window_info id="Maven Projects" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="3" side_tool="false" content_ui="tabs" />
- <window_info id="Project" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" weight="0.17263496" sideWeight="0.64487034" order="0" side_tool="false" content_ui="tabs" />
+ <window_info id="Commander" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.4" sideWeight="0.5" order="0" side_tool="false" content_ui="tabs" />
+ <window_info id="Project" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" weight="0.17228694" sideWeight="0.8373894" order="0" side_tool="false" content_ui="tabs" />
<window_info id="Dependency Viewer" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="7" side_tool="false" content_ui="tabs" />
<window_info id="Run" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.20599613" sideWeight="0.5" order="2" side_tool="false" content_ui="tabs" />
<window_info id="Cvs" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.25" sideWeight="0.5" order="4" side_tool="false" content_ui="tabs" />
<window_info id="Message" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="0" side_tool="false" content_ui="tabs" />
+ <window_info id="Find" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.42307693" sideWeight="0.5" order="1" side_tool="false" content_ui="tabs" />
<window_info id="Dataflow to this" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="7" side_tool="false" content_ui="tabs" />
- <window_info id="Commander" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.4" sideWeight="0.5" order="0" side_tool="false" content_ui="tabs" />
+ <window_info id="Messages" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.16344294" sideWeight="0.5" order="7" side_tool="false" content_ui="tabs" />
<window_info id="Hierarchy" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.25" sideWeight="0.5" order="2" side_tool="false" content_ui="tabs" />
<window_info id="Inspection" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.4" sideWeight="0.5" order="5" side_tool="false" content_ui="tabs" />
</layout>
@@ -2219,6 +2086,8 @@
<option name="CHECK_LOCALLY_CHANGED_CONFLICTS_IN_BACKGROUND" value="true" />
<option name="ENABLE_BACKGROUND_PROCESSES" value="false" />
<option name="CHANGED_ON_SERVER_INTERVAL" value="60" />
+ <option name="SHOW_ONLY_CHANGED_IN_SELECTION_DIFF" value="true" />
+ <option name="CHECK_COMMIT_MESSAGE_SPELLING" value="true" />
<option name="FORCE_NON_EMPTY_COMMENT" value="false" />
<option name="LAST_COMMIT_MESSAGE" value="added jca example showing connecting to remote HQ server and doc updates" />
<option name="MAKE_NEW_CHANGELIST_ACTIVE" value="true" />
@@ -2840,138 +2709,98 @@
<verbose value="true" />
<viewClosedWhenNoErrors value="false" />
</buildFile>
- <buildFile url="file://$PROJECT_DIR$/examples/soak/normal/build.xml">
- <antCommandLine value="" />
- <runInBackground value="true" />
- <targetFilters>
- <filter targetName="runReceiver" isVisible="false" />
- <filter targetName="runExample" isVisible="false" />
- <filter targetName="runServer" isVisible="false" />
- <filter targetName="clean" isVisible="false" />
- <filter targetName="compile" isVisible="false" />
- <filter targetName="clean-all" isVisible="false" />
- <filter targetName="init" isVisible="false" />
- <filter targetName="runSender" isVisible="false" />
- <filter targetName="all" isVisible="true" />
- </targetFilters>
- <treeView value="true" />
- <verbose value="true" />
- <viewClosedWhenNoErrors value="false" />
- </buildFile>
</component>
<component name="editorHistoryManager">
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/server0/client-jndi.properties">
+ <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/readme.html">
<provider selected="true" editor-type-id="text-editor">
- <state line="14" column="43" selection-start="722" selection-end="722" vertical-scroll-proportion="0.0">
- <folding />
- </state>
+ <state line="46" column="44" selection-start="2573" selection-end="3415" vertical-scroll-proportion="0.0" />
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/server0/hornetq-jms.xml">
+ <entry file="file://$PROJECT_DIR$/docs/user-manual/en/appserver-integration.xml">
<provider selected="true" editor-type-id="text-editor">
- <state line="17" column="29" selection-start="891" selection-end="908" vertical-scroll-proportion="0.0">
- <folding />
- </state>
+ <state line="836" column="101" selection-start="47929" selection-end="47929" vertical-scroll-proportion="0.0" />
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-config/src/org/hornetq/javaee/example/server2/MDBQueueB.java">
+ <entry file="file://$PROJECT_DIR$/docs/user-manual/en/examples.xml">
<provider selected="true" editor-type-id="text-editor">
- <state line="36" column="13" selection-start="1596" selection-end="1596" vertical-scroll-proportion="0.0">
- <folding />
- </state>
+ <state line="549" column="37" selection-start="34357" selection-end="34357" vertical-scroll-proportion="0.0" />
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-config/src/org/hornetq/javaee/example/server/MDBQueueA.java">
+ <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/server/ra.xml">
<provider selected="true" editor-type-id="text-editor">
- <state line="36" column="13" selection-start="1593" selection-end="1593" vertical-scroll-proportion="0.0">
- <folding />
- </state>
+ <state line="61" column="27" selection-start="2184" selection-end="3025" vertical-scroll-proportion="0.0" />
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-config/readme.html">
+ <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/server0/hornetq-configuration.xml">
<provider selected="true" editor-type-id="text-editor">
- <state line="149" column="19" selection-start="9467" selection-end="9815" vertical-scroll-proportion="-16.956522">
- <folding />
- </state>
+ <state line="46" column="49" selection-start="1871" selection-end="1871" vertical-scroll-proportion="0.0" />
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/server/jms-ds.xml">
+ <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/src/org/hornetq/javaee/example/server/MDBQueue.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="34" column="31" selection-start="960" selection-end="1628" vertical-scroll-proportion="0.0">
- <folding />
- </state>
+ <state line="46" column="25" selection-start="1849" selection-end="1865" vertical-scroll-proportion="0.0" />
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/server0/hornetq-configuration.xml">
- <provider selected="true" editor-type-id="text-editor">
- <state line="46" column="49" selection-start="1976" selection-end="1976" vertical-scroll-proportion="0.0">
- <folding />
- </state>
- </provider>
- </entry>
<entry file="file://$PROJECT_DIR$/examples/javaee/jca-config/src/org/hornetq/javaee/example/server2/StatelessSenderService.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="23" column="17" selection-start="806" selection-end="806" vertical-scroll-proportion="0.0">
- <folding />
- </state>
+ <state line="23" column="17" selection-start="806" selection-end="806" vertical-scroll-proportion="0.0" />
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/examples/javaee/jca-config/src/org/hornetq/javaee/example/server2/StatelessSender.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="50" column="59" selection-start="1556" selection-end="1609" vertical-scroll-proportion="0.0">
- <folding />
- </state>
+ <state line="50" column="59" selection-start="1556" selection-end="1609" vertical-scroll-proportion="0.0" />
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/examples/javaee/mdb-bmt/src/org/hornetq/javaee/example/MDB_BMTClientExample.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="71" column="32" selection-start="2239" selection-end="2468" vertical-scroll-proportion="0.0">
+ <state line="71" column="32" selection-start="2239" selection-end="2468" vertical-scroll-proportion="0.0" />
+ </provider>
+ </entry>
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java">
+ <provider selected="true" editor-type-id="text-editor">
+ <state line="704" column="12" selection-start="19472" selection-end="19472" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/server/ra.xml">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/paging/cursor/PageSubscription.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="61" column="27" selection-start="2184" selection-end="3025" vertical-scroll-proportion="0.0">
+ <state line="38" column="0" selection-start="1213" selection-end="1213" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/src/org/hornetq/javaee/example/MDBRemoteServerClientExample.java">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="95" column="17" selection-start="3054" selection-end="3226" vertical-scroll-proportion="0.0">
- <folding>
- <element signature="imports" expanded="true" />
- </folding>
+ <state line="46" column="13" selection-start="1822" selection-end="1822" vertical-scroll-proportion="0.0">
+ <folding />
</state>
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/src/org/hornetq/javaee/example/server/MDBQueue.java">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="46" column="25" selection-start="1849" selection-end="1865" vertical-scroll-proportion="0.0">
- <folding>
- <element signature="imports" expanded="true" />
- </folding>
+ <state line="65" column="13" selection-start="2716" selection-end="2716" vertical-scroll-proportion="0.0">
+ <folding />
</state>
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/examples/javaee/jca-remote/readme.html">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/server/HornetQComponent.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="46" column="44" selection-start="2573" selection-end="3415" vertical-scroll-proportion="-16.304348">
+ <state line="28" column="8" selection-start="929" selection-end="929" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/docs/user-manual/en/appserver-integration.xml">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="836" column="101" selection-start="48141" selection-end="48141" vertical-scroll-proportion="-15.04">
+ <state line="592" column="40" selection-start="18885" selection-end="18908" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/docs/user-manual/en/examples.xml">
+ <entry file="file://$PROJECT_DIR$/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="534" column="124" selection-start="33920" selection-end="33920" vertical-scroll-proportion="0.4304762">
+ <state line="368" column="21" selection-start="11960" selection-end="11960" vertical-scroll-proportion="0.35629454">
<folding />
</state>
</provider>
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -53,6 +53,7 @@
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -367,7 +368,7 @@
// ConnectionLifeCycleListener implementation --------------------------------------------------
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
}
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -42,6 +42,7 @@
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
/**
@@ -68,7 +69,7 @@
this.interceptors = interceptors;
}
- public ConnectionEntry createConnectionEntry(final Connection connection)
+ public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection)
{
final Configuration config = server.getConfiguration();
@@ -178,13 +179,13 @@
final boolean isCC = msg.isClusterConnection();
- server.getClusterManager().addClusterTopologyListener(listener, isCC);
+ acceptorUsed.getClusterConnection().addClusterTopologyListener(listener, isCC);
rc.addCloseListener(new CloseListener()
{
public void connectionClosed()
{
- server.getClusterManager().removeClusterTopologyListener(listener, isCC);
+ acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener, isCC);
}
});
}
@@ -205,7 +206,8 @@
{
log.trace("Server " + server + " receiving nodeUp from NodeID=" + msg.getNodeID() + ", pair=" + pair);
}
- server.getClusterManager().nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), pair, msg.isBackup());
+
+ acceptorUsed.getClusterConnection().nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), pair, msg.isBackup());
}
}
});
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -38,6 +38,7 @@
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.utils.UUIDGenerator;
@@ -109,7 +110,7 @@
// ProtocolManager implementation --------------------------------
- public ConnectionEntry createConnectionEntry(final Connection connection)
+ public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection)
{
StompConnection conn = new StompConnection(connection, this);
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -54,16 +55,21 @@
private volatile boolean started;
private final ExecutorFactory executorFactory;
+
+ private final ClusterConnection clusterConnection;
private boolean paused;
private NotificationService notificationService;
- public InVMAcceptor(final Map<String, Object> configuration,
+ public InVMAcceptor(final ClusterConnection clusterConnection,
+ final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
final Executor threadPool)
{
+ this.clusterConnection = clusterConnection;
+
this.handler = handler;
this.listener = listener;
@@ -73,6 +79,11 @@
executorFactory = new OrderedExecutorFactory(threadPool);
}
+ public ClusterConnection getClusterConnection()
+ {
+ return clusterConnection;
+ }
+
public synchronized void start() throws Exception
{
if (started)
@@ -189,7 +200,7 @@
throw new IllegalStateException("Acceptor is not started");
}
- new InVMConnection(id, connectionID, remoteHandler, new Listener(connector), clientExecutor);
+ new InVMConnection(this, id, connectionID, remoteHandler, new Listener(connector), clientExecutor);
}
public void disconnect(final String connectionID)
@@ -209,6 +220,8 @@
private class Listener implements ConnectionLifeCycleListener
{
+ //private static Listener instance = new Listener();
+
private final InVMConnector connector;
Listener(final InVMConnector connector)
@@ -216,14 +229,14 @@
this.connector = connector;
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent((String)connection.getID(), connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
}
- listener.connectionCreated(connection, protocol);
+ listener.connectionCreated(acceptor, connection, protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -17,6 +17,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -31,14 +32,15 @@
*/
public class InVMAcceptorFactory implements AcceptorFactory
{
- public Acceptor createAcceptor(final Map<String, Object> configuration,
+ public Acceptor createAcceptor(final ClusterConnection clusterConnection,
+ final Map<String, Object> configuration,
final BufferHandler handler,
final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new InVMAcceptor(configuration, handler, listener, threadPool);
+ return new InVMAcceptor(clusterConnection, configuration, handler, listener, threadPool);
}
public Set<String> getAllowableProperties()
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -57,15 +58,17 @@
private volatile boolean closing;
- public InVMConnection(final int serverID,
+ public InVMConnection(final Acceptor acceptor,
+ final int serverID,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
final Executor executor)
{
- this(serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener, executor);
+ this(acceptor, serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener, executor);
}
- public InVMConnection(final int serverID,
+ public InVMConnection(final Acceptor acceptor,
+ final int serverID,
final String id,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
@@ -81,7 +84,7 @@
this.executor = executor;
- listener.connectionCreated(this, ProtocolType.CORE);
+ listener.connectionCreated(acceptor, this, ProtocolType.CORE);
}
public void close()
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -172,19 +172,20 @@
final ConnectionLifeCycleListener listener,
final Executor serverExecutor)
{
- return new InVMConnection(id, handler, listener, serverExecutor);
+ // No acceptor on a client connection
+ return new InVMConnection(null, id, handler, listener, serverExecutor);
}
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent((String)connection.getID(), connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
}
- listener.connectionCreated(connection, protocol);
+ listener.connectionCreated(acceptor, connection, protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -37,6 +37,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.WebSocketServerHandler;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -87,6 +88,8 @@
{
static final Logger log = Logger.getLogger(NettyAcceptor.class);
+ private ClusterConnection clusterConnection;
+
private ChannelFactory channelFactory;
private volatile ChannelGroup serverChannelGroup;
@@ -158,6 +161,7 @@
private final long batchDelay;
private final boolean directDeliver;
+
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
@@ -166,6 +170,21 @@
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
+ this(null, configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
+ }
+
+
+ public NettyAcceptor(final ClusterConnection clusterConnection,
+ final Map<String, Object> configuration,
+ final BufferHandler handler,
+ final BufferDecoder decoder,
+ final ConnectionLifeCycleListener listener,
+ final Executor threadPool,
+ final ScheduledExecutorService scheduledThreadPool)
+ {
+
+ this.clusterConnection = clusterConnection;
+
this.handler = handler;
this.decoder = decoder;
@@ -618,6 +637,14 @@
{
this.notificationService = notificationService;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.Acceptor#getClusterConnection()
+ */
+ public ClusterConnection getClusterConnection()
+ {
+ return clusterConnection;
+ }
// Inner classes -----------------------------------------------------------------------------
@@ -633,7 +660,7 @@
@Override
public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
{
- new NettyConnection(e.getChannel(), new Listener(), !httpEnabled && batchDelay > 0, directDeliver);
+ new NettyConnection(NettyAcceptor.this, e.getChannel(), new Listener(), !httpEnabled && batchDelay > 0, directDeliver);
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
if (sslHandler != null)
@@ -662,14 +689,14 @@
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent(connection.getID(), (NettyConnection)connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
}
- listener.connectionCreated(connection, NettyAcceptor.this.protocol);
+ listener.connectionCreated(acceptor, connection, NettyAcceptor.this.protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -18,6 +18,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -31,14 +32,15 @@
*/
public class NettyAcceptorFactory implements AcceptorFactory
{
- public Acceptor createAcceptor(final Map<String, Object> configuration,
+ public Acceptor createAcceptor(final ClusterConnection connection,
+ final Map<String, Object> configuration,
final BufferHandler handler,
final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new NettyAcceptor(configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
+ return new NettyAcceptor(connection, configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
}
public Set<String> getAllowableProperties()
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -21,6 +21,7 @@
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.spi.core.remoting.ReadyListener;
@@ -72,6 +73,15 @@
boolean batchingEnabled,
boolean directDeliver)
{
+ this(null, channel, listener, batchingEnabled, directDeliver);
+ }
+
+ public NettyConnection(final Acceptor acceptor,
+ final Channel channel,
+ final ConnectionLifeCycleListener listener,
+ boolean batchingEnabled,
+ boolean directDeliver)
+ {
this.channel = channel;
this.listener = listener;
@@ -80,7 +90,7 @@
this.directDeliver = directDeliver;
- listener.connectionCreated(this, ProtocolType.CORE);
+ listener.connectionCreated(acceptor, this, ProtocolType.CORE);
}
// Public --------------------------------------------------------
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -35,6 +35,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -495,7 +496,8 @@
ch.getPipeline().get(HornetQChannelHandler.class).active = true;
}
- NettyConnection conn = new NettyConnection(ch, new Listener(), !httpEnabled && batchDelay > 0, false);
+ // No acceptor on a client connection
+ NettyConnection conn = new NettyConnection(null, ch, new Listener(), !httpEnabled && batchDelay > 0, false);
return conn;
}
@@ -689,7 +691,7 @@
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent(connection.getID(), connection) != null)
{
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -39,6 +39,7 @@
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.impl.ServerSessionImpl;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.spi.core.protocol.ConnectionEntry;
@@ -94,6 +95,8 @@
private final ScheduledExecutorService scheduledThreadPool;
private FailureCheckAndFlushThread failureCheckAndFlushThread;
+
+ private final ClusterManager clusterManager;
private Map<ProtocolType, ProtocolManager> protocolMap = new ConcurrentHashMap<ProtocolType, ProtocolManager>();
@@ -101,7 +104,8 @@
// Constructors --------------------------------------------------
- public RemotingServiceImpl(final Configuration config,
+ public RemotingServiceImpl(final ClusterManager clusterManager,
+ final Configuration config,
final HornetQServer server,
final ManagementService managementService,
final ScheduledExecutorService scheduledThreadPool)
@@ -109,6 +113,8 @@
transportConfigs = config.getAcceptorConfigurations();
this.server = server;
+
+ this.clusterManager = clusterManager;
ClassLoader loader = Thread.currentThread().getContextClassLoader();
for (String interceptorClass : config.getInterceptorClassNames())
@@ -202,7 +208,9 @@
ProtocolManager manager = protocolMap.get(protocol);
- Acceptor acceptor = factory.createAcceptor(info.getParams(),
+ // TODO: parameterize the cluster connection
+ Acceptor acceptor = factory.createAcceptor(clusterManager.getDefaultConnection(),
+ info.getParams(),
new DelegatingBufferHandler(),
manager,
this,
@@ -370,7 +378,7 @@
return protocolMap.get(protocol);
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
if (server == null)
{
@@ -384,7 +392,7 @@
throw new IllegalArgumentException("Unknown protocol " + protocol);
}
- ConnectionEntry entry = pmgr.createConnectionEntry(connection);
+ ConnectionEntry entry = pmgr.createConnectionEntry(acceptor, connection);
if (isTrace)
{
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -15,9 +15,11 @@
import java.util.Map;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.HornetQServer;
@@ -37,7 +39,13 @@
String getNodeID();
HornetQServer getServer();
+
+ void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
+ void addClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
+
+ void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
+
/**
* @return a Map of node ID and addresses
*/
@@ -47,8 +55,14 @@
TransportConfiguration getConnector();
+ Topology getTopology();
+
void flushExecutor();
// for debug
String describe();
+
+ void announceNode();
+
+ void announceBackup();
}
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -16,11 +16,7 @@
import java.util.Map;
import java.util.Set;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.server.HornetQComponent;
@@ -37,21 +33,19 @@
Map<String, Bridge> getBridges();
Set<ClusterConnection> getClusterConnections();
+
+ /**
+ * Return the default ClusterConnection to be used case it's not defined by the acceptor
+ * @return
+ */
+ ClusterConnection getDefaultConnection();
ClusterConnection getClusterConnection(SimpleString name);
Set<BroadcastGroup> getBroadcastGroups();
-
- void addClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
- void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
-
void activate();
- void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
-
- Topology getTopology();
-
void flushExecutor();
void announceBackup() throws Exception;
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -33,6 +33,8 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.AfterConnectInternalListener;
@@ -46,6 +48,7 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
@@ -79,8 +82,6 @@
private final ExecutorFactory executorFactory;
- private final Topology clusterManagerTopology;
-
private final Executor executor;
private final HornetQServer server;
@@ -140,9 +141,17 @@
private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
private final ClusterManagerInternal manager;
+
+
+ // Stuff that used to be on the ClusterManager
+
+ private final Topology topology = new Topology(this);
+
+ private volatile ServerLocatorInternal backupServerLocator;
+
+
public ClusterConnectionImpl(final ClusterManagerInternal manager,
- final Topology clusterManagerTopology,
final TransportConfiguration[] tcConfigs,
final TransportConfiguration connector,
final SimpleString name,
@@ -204,6 +213,8 @@
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
+
+ this.topology.setExecutor(executor);
this.server = server;
@@ -227,8 +238,6 @@
this.callTimeout = callTimeout;
- this.clusterManagerTopology = clusterManagerTopology;
-
clusterConnector = new StaticClusterConnector(tcConfigs);
if (tcConfigs != null && tcConfigs.length > 0)
@@ -244,7 +253,6 @@
}
public ClusterConnectionImpl(final ClusterManagerImpl manager,
- final Topology clusterManagerTopology,
DiscoveryGroupConfiguration dg,
final TransportConfiguration connector,
final SimpleString name,
@@ -308,6 +316,8 @@
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
+
+ this.topology.setExecutor(executor);
this.server = server;
@@ -330,11 +340,9 @@
clusterConnector = new DiscoveryClusterConnector(dg);
this.manager = manager;
-
- this.clusterManagerTopology = clusterManagerTopology;
}
- public void start() throws Exception
+ public void start() throws Exception
{
synchronized (this)
{
@@ -410,13 +418,21 @@
props);
managementService.sendNotification(notification);
}
+
+
executor.execute(new Runnable()
{
public void run()
{
synchronized (ClusterConnectionImpl.this)
{
+ if (backupServerLocator != null)
+ {
+ backupServerLocator.close();
+ backupServerLocator = null;
+ }
+
if (serverLocator != null)
{
serverLocator.close();
@@ -430,12 +446,98 @@
started = false;
}
+
+ public void announceBackup()
+ {
+ this.backupServerLocator = clusterConnector.createServerLocator();
+
+ backupServerLocator.setReconnectAttempts(-1);
+ backupServerLocator.setInitialConnectAttempts(-1);
+
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(ClusterConnectionImpl.this + ":: announcing " + connector + " to " + backupServerLocator);
+ }
+ ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
+ if (backupSessionFactory != null)
+ {
+ backupSessionFactory.getConnection()
+ .getChannel(0, -1)
+ .send(new NodeAnnounceMessage(System.currentTimeMillis(),
+ nodeUUID.toString(),
+ true,
+ connector,
+ null));
+ log.info("backup announced");
+ }
+ //backupSessionFactory.close();
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to announce backup, retrying", e);
+ }
+ }
+ });
+ }
+
+ private TopologyMember getLocalMember()
+ {
+ return topology.getMember(manager.getNodeId());
+ }
+
+ public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
+ {
+ topology.addClusterTopologyListener(listener);
+
+ // no need to use an executor here since the Topology is already using one
+ topology.sendTopology(listener);
+ }
+
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
+ {
+ topology.removeClusterTopologyListener(listener);
+ }
+
+ public Topology getTopology()
+ {
+ return topology;
+ }
+
+ public void nodeAnnounced(final long uniqueEventID,
+ final String nodeID,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean backup)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
+ }
+
+ TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
+ newMember.setUniqueEventID(uniqueEventID);
+ if (backup)
+ {
+ topology.updateBackup(nodeID, new TopologyMember(connectorPair.a, connectorPair.b));
+ }
+ else
+ {
+ topology.updateMember(uniqueEventID, nodeID, newMember);
+ }
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.client.impl.AfterConnectInternalListener#onConnection(org.hornetq.core.client.impl.ClientSessionFactoryInternal)
*/
public void onConnection(ClientSessionFactoryInternal sf)
{
- TopologyMember localMember = manager.getLocalMember();
+ TopologyMember localMember = getLocalMember();
sf.sendNodeAnnounce(localMember.getUniqueEventID(),
manager.getNodeId(),
false,
@@ -498,7 +600,25 @@
}
backup = false;
+
+ topology.updateAsLive(manager.getNodeId(), new TopologyMember(connector, null));
+ if (backupServerLocator != null)
+ {
+ // todo we could use the topology of this to preempt it arriving from the cc
+ try
+ {
+ backupServerLocator.close();
+ }
+ catch (Exception e)
+ {
+ log.warn("problem closing backup session factory", e);
+ }
+ backupServerLocator = null;
+ }
+
+
+
serverLocator = clusterConnector.createServerLocator();
if (serverLocator != null)
@@ -509,7 +629,7 @@
log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
}
- final TopologyMember currentMember = clusterManagerTopology.getMember(nodeUUID.toString());
+ final TopologyMember currentMember = topology.getMember(manager.getNodeId());
if (currentMember == null)
{
@@ -701,7 +821,26 @@
}
}
}
+
+ public synchronized void announceNode()
+ {
+ String nodeID = server.getNodeID().toString();
+
+ TopologyMember localMember;
+
+ if (backup)
+ {
+ localMember = new TopologyMember(null, connector);
+ }
+ else
+ {
+ localMember = new TopologyMember(connector, null);
+ }
+ topology.updateAsLive(nodeID, localMember);
+ }
+
+
private void createNewRecord(final long eventUID,
final String targetNodeID,
final TransportConfiguration connector,
@@ -709,7 +848,7 @@
final Queue queue,
final boolean start) throws Exception
{
- final ServerLocatorInternal targetLocator = new ServerLocatorImpl(clusterManagerTopology, false, connector);
+ final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology, false, connector);
String nodeId;
@@ -1415,7 +1554,7 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
}
- return new ServerLocatorImpl(clusterManagerTopology, true, tcConfigs);
+ return new ServerLocatorImpl(topology, true, tcConfigs);
}
else
{
@@ -1445,7 +1584,7 @@
public ServerLocatorInternal createServerLocator()
{
- return new ServerLocatorImpl(clusterManagerTopology, true, dg);
+ return new ServerLocatorImpl(topology, true, dg);
}
}
}
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -30,15 +30,10 @@
import java.util.concurrent.ScheduledFuture;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.client.impl.Topology;
-import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
@@ -46,7 +41,6 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
@@ -83,6 +77,8 @@
private final PostOffice postOffice;
private final ScheduledExecutorService scheduledExecutor;
+
+ private ClusterConnection defaultClusterConnection;
private final ManagementService managementService;
@@ -99,10 +95,6 @@
// the cluster connections which links this node to other cluster nodes
private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
- private final Topology topology = new Topology(this);
-
- private volatile ServerLocatorInternal backupServerLocator;
-
private final Set<ServerLocatorInternal> clusterLocators = new ConcurrentHashSet<ServerLocatorInternal>();
private final Executor executor;
@@ -126,8 +118,6 @@
executor = executorFactory.getExecutor();;
- topology.setExecutor(executor);
-
this.server = server;
this.postOffice = postOffice;
@@ -152,7 +142,6 @@
out.println("Information on " + this);
out.println("*******************************************************");
- out.println("Topology: " + topology.describe("Toopology on " + this));
for (ClusterConnection conn : this.clusterConnections.values())
{
@@ -163,17 +152,17 @@
return str.toString();
}
+
+ public ClusterConnection getDefaultConnection()
+ {
+ return defaultClusterConnection;
+ }
public String toString()
{
return "ClusterManagerImpl[server=" + server + "]@" + System.identityHashCode(this);
}
- public TopologyMember getLocalMember()
- {
- return topology.getMember(nodeUUID.toString());
- }
-
public String getNodeId()
{
return nodeUUID.toString();
@@ -193,36 +182,19 @@
deployBroadcastGroup(config);
}
- String connectorName = null;
-
for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
{
- if (connectorName == null)
- {
- connectorName = config.getConnectorName();
- break;
- }
- }
+ deployClusterConnection(config);
+ }
- if (connectorName != null)
+ for (ClusterConnection conn : clusterConnections.values())
{
- TransportConfiguration nodeConnector = configuration.getConnectorConfigurations().get(connectorName);
- if (nodeConnector == null)
+ conn.announceNode();
+ if (backup)
{
- log.warn("No connecor with name '" + connectorName +
- "'. The cluster connection will not be deployed.");
- return;
+ conn.announceBackup();
}
-
- // Now announce presence
- announceNode(nodeConnector);
-
- for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
- {
- deployClusterConnection(config);
- }
}
-
}
for (BridgeConfiguration config : configuration.getBridgeConfigurations())
@@ -267,12 +239,6 @@
}
bridges.clear();
-
- if (backupServerLocator != null)
- {
- backupServerLocator.close();
- backupServerLocator = null;
- }
}
for (ServerLocatorInternal clusterLocator : clusterLocators)
@@ -292,28 +258,6 @@
clusterConnections.clear();
}
- public void nodeAnnounced(final long uniqueEventID,
- final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean backup)
- {
- if (log.isDebugEnabled())
- {
- log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
- }
-
- TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
- newMember.setUniqueEventID(uniqueEventID);
- if (backup)
- {
- topology.updateBackup(nodeID, new TopologyMember(connectorPair.a, connectorPair.b));
- }
- else
- {
- topology.updateMember(uniqueEventID, nodeID, newMember);
- }
- }
-
public void flushExecutor()
{
Future future = new Future();
@@ -350,24 +294,6 @@
return clusterConnections.get(name.toString());
}
- public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
- {
- topology.addClusterTopologyListener(listener);
-
- // no need to use an executor here since the Topology is already using one
- topology.sendTopology(listener);
- }
-
- public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
- {
- topology.removeClusterTopologyListener(listener);
- }
-
- public Topology getTopology()
- {
- return topology;
- }
-
// backup node becomes live
public synchronized void activate()
{
@@ -375,27 +301,6 @@
{
backup = false;
- String nodeID = server.getNodeID().toString();
-
- TopologyMember member = topology.getMember(nodeID);
- // swap backup as live and send it to everybody
- member = new TopologyMember(member.getConnector().b, null);
- topology.updateAsLive(nodeID, member);
-
- if (backupServerLocator != null)
- {
- // todo we could use the topology of this to preempt it arriving from the cc
- try
- {
- backupServerLocator.close();
- }
- catch (Exception e)
- {
- log.warn("problem closing backup session factory", e);
- }
- backupServerLocator = null;
- }
-
for (BroadcastGroup broadcastGroup : broadcastGroups.values())
{
try
@@ -432,31 +337,15 @@
log.warn("unable to start bridge " + bridge.getName(), e);
}
}
-
- topology.sendMember(nodeID);
}
}
public void announceBackup() throws Exception
{
- List<ClusterConnectionConfiguration> configs = this.configuration.getClusterConfigurations();
- if (!configs.isEmpty())
+ for (ClusterConnection conn : this.clusterConnections.values())
{
- ClusterConnectionConfiguration config = configs.get(0);
-
- TransportConfiguration connector = configuration.getConnectorConfigurations().get(config.getConnectorName());
-
- if (connector == null)
- {
- log.warn("No connecor with name '" + config.getConnectorName() + "'. backup cannot be announced.");
- return;
- }
- announceBackup(config, connector);
+ conn.announceBackup();
}
- else
- {
- log.warn("no cluster connections defined, unable to announce backup");
- }
}
public void addClusterLocator(final ServerLocatorInternal serverLocator)
@@ -468,112 +357,7 @@
{
this.clusterLocators.remove(serverLocator);
}
-
- private synchronized void announceNode(final TransportConfiguration nodeConnector)
- {
- String nodeID = server.getNodeID().toString();
-
- TopologyMember localMember;
- if (backup)
- {
- localMember = new TopologyMember(null, nodeConnector);
- }
- else
- {
- localMember = new TopologyMember(nodeConnector, null);
- }
-
- topology.updateAsLive(nodeID, localMember);
- }
-
- private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
- {
- if (broadcastGroups.containsKey(config.getName()))
- {
- ClusterManagerImpl.log.warn("There is already a broadcast-group with name " + config.getName() +
- " deployed. This one will not be deployed.");
-
- return;
- }
-
- InetAddress localAddress = null;
- if (config.getLocalBindAddress() != null)
- {
- localAddress = InetAddress.getByName(config.getLocalBindAddress());
- }
-
- InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
-
- BroadcastGroupImpl group = new BroadcastGroupImpl(nodeUUID.toString(),
- config.getName(),
- localAddress,
- config.getLocalBindPort(),
- groupAddress,
- config.getGroupPort(),
- !backup);
-
- for (String connectorInfo : config.getConnectorInfos())
- {
- TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorInfo);
-
- if (connector == null)
- {
- logWarnNoConnector(config.getName(), connectorInfo);
-
- return;
- }
-
- group.addConnector(connector);
- }
-
- ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(group,
- 0L,
- config.getBroadcastPeriod(),
- MILLISECONDS);
-
- group.setScheduledFuture(future);
-
- broadcastGroups.put(config.getName(), group);
-
- managementService.registerBroadcastGroup(group, config);
-
- if (!backup)
- {
- group.start();
- }
- }
-
- private void logWarnNoConnector(final String connectorName, final String bgName)
- {
- ClusterManagerImpl.log.warn("There is no connector deployed with name '" + connectorName +
- "'. The broadcast group with name '" +
- bgName +
- "' will not be deployed.");
- }
-
- private TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames)
- {
- TransportConfiguration[] tcConfigs = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
- connectorNames.size());
- int count = 0;
- for (String connectorName : connectorNames)
- {
- TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorName);
-
- if (connector == null)
- {
- ClusterManagerImpl.log.warn("No connector defined with name '" + connectorName +
- "'. The bridge will not be deployed.");
-
- return null;
- }
-
- tcConfigs[count++] = connector;
- }
-
- return tcConfigs;
- }
-
+
public synchronized void deployBridge(final BridgeConfiguration config) throws Exception
{
if (config.getName() == null)
@@ -726,11 +510,42 @@
bridge.flushExecutor();
}
- private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
+ // for testing
+ public void clear()
{
+ for (Bridge bridge : bridges.values())
+ {
+ try
+ {
+ bridge.stop();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ bridges.clear();
+ for (ClusterConnection clusterConnection : clusterConnections.values())
+ {
+ try
+ {
+ clusterConnection.stop();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ clusterConnections.clear();
+ }
+
+ // Private methods ----------------------------------------------------------------------------------------------------
+
+ private void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
+ {
if (config.getName() == null)
{
- ClusterManagerImpl.log.warn("Must specify a unique name for each cluster. This one will not be deployed.");
+ ClusterManagerImpl.log.warn("Must specify a unique name for each cluster connection. This one will not be deployed.");
return;
}
@@ -781,7 +596,6 @@
}
clusterConnection = new ClusterConnectionImpl(this,
- topology,
dg,
connector,
new SimpleString(config.getName()),
@@ -819,7 +633,6 @@
}
clusterConnection = new ClusterConnectionImpl(this,
- topology,
tcConfigs,
connector,
new SimpleString(config.getName()),
@@ -847,6 +660,11 @@
config.isAllowDirectConnectionsOnly());
}
+ if (defaultClusterConnection == null)
+ {
+ defaultClusterConnection = clusterConnection;
+ }
+
managementService.registerCluster(clusterConnection, config);
clusterConnections.put(config.getName(), clusterConnection);
@@ -856,74 +674,8 @@
log.debug("ClusterConnection.start at " + clusterConnection, new Exception("trace"));
}
clusterConnection.start();
-
- if (backup)
- {
- announceBackup(config, connector);
- }
}
-
- private void announceBackup(final ClusterConnectionConfiguration config, final TransportConfiguration connector) throws Exception
- {
- if (config.getStaticConnectors() != null)
- {
- TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
-
- backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
- backupServerLocator.setReconnectAttempts(-1);
- backupServerLocator.setInitialConnectAttempts(-1);
- }
- else if (config.getDiscoveryGroupName() != null)
- {
- DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
- .get(config.getDiscoveryGroupName());
-
- if (dg == null)
- {
- ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
- "'. The cluster connection will not be deployed.");
- }
-
- backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(dg);
- backupServerLocator.setReconnectAttempts(-1);
- backupServerLocator.setInitialConnectAttempts(-1);
- }
- else
- {
- return;
- }
- log.info("announcing backup");
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- if (log.isDebugEnabled())
- {
- log.debug(ClusterManagerImpl.this + ":: announcing " + connector + " to " + backupServerLocator);
- }
- ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
- if (backupSessionFactory != null)
- {
- backupSessionFactory.getConnection()
- .getChannel(0, -1)
- .send(new NodeAnnounceMessage(System.currentTimeMillis(),
- nodeUUID.toString(),
- true,
- connector,
- null));
- log.info("backup announced");
- }
- }
- catch (Exception e)
- {
- log.warn("Unable to announce backup, retrying", e);
- }
- }
- });
- }
-
+
private Transformer instantiateTransformer(final String transformerClassName)
{
Transformer transformer = null;
@@ -945,32 +697,94 @@
return transformer;
}
- // for testing
- public void clear()
+
+ private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
{
- for (Bridge bridge : bridges.values())
+ if (broadcastGroups.containsKey(config.getName()))
{
- try
+ ClusterManagerImpl.log.warn("There is already a broadcast-group with name " + config.getName() +
+ " deployed. This one will not be deployed.");
+
+ return;
+ }
+
+ InetAddress localAddress = null;
+ if (config.getLocalBindAddress() != null)
+ {
+ localAddress = InetAddress.getByName(config.getLocalBindAddress());
+ }
+
+ InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
+
+ BroadcastGroupImpl group = new BroadcastGroupImpl(nodeUUID.toString(),
+ config.getName(),
+ localAddress,
+ config.getLocalBindPort(),
+ groupAddress,
+ config.getGroupPort(),
+ !backup);
+
+ for (String connectorInfo : config.getConnectorInfos())
+ {
+ TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorInfo);
+
+ if (connector == null)
{
- bridge.stop();
+ logWarnNoConnector(config.getName(), connectorInfo);
+
+ return;
}
- catch (Exception e)
- {
- log.warn(e.getMessage(), e);
- }
+
+ group.addConnector(connector);
}
- bridges.clear();
- for (ClusterConnection clusterConnection : clusterConnections.values())
+
+ ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(group,
+ 0L,
+ config.getBroadcastPeriod(),
+ MILLISECONDS);
+
+ group.setScheduledFuture(future);
+
+ broadcastGroups.put(config.getName(), group);
+
+ managementService.registerBroadcastGroup(group, config);
+
+ if (!backup)
{
- try
+ group.start();
+ }
+ }
+
+ private void logWarnNoConnector(final String connectorName, final String bgName)
+ {
+ ClusterManagerImpl.log.warn("There is no connector deployed with name '" + connectorName +
+ "'. The broadcast group with name '" +
+ bgName +
+ "' will not be deployed.");
+ }
+
+ private TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames)
+ {
+ TransportConfiguration[] tcConfigs = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+ connectorNames.size());
+ int count = 0;
+ for (String connectorName : connectorNames)
+ {
+ TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorName);
+
+ if (connector == null)
{
- clusterConnection.stop();
+ ClusterManagerImpl.log.warn("No connector defined with name '" + connectorName +
+ "'. The bridge will not be deployed.");
+
+ return null;
}
- catch (Exception e)
- {
- e.printStackTrace();
- }
+
+ tcConfigs[count++] = connector;
}
- clusterConnections.clear();
+
+ return tcConfigs;
}
+
+
}
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -14,7 +14,6 @@
package org.hornetq.core.server.cluster.impl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.server.cluster.ClusterManager;
/**
@@ -30,8 +29,6 @@
void removeClusterLocator(ServerLocatorInternal locator);
- TopologyMember getLocalMember();
-
String getNodeId();
}
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -1414,8 +1414,6 @@
managementService = new ManagementServiceImpl(mbeanServer, configuration);
- remotingService = new RemotingServiceImpl(configuration, this, managementService, scheduledPool);
-
if (configuration.getMemoryMeasureInterval() != -1)
{
memoryManager = new MemoryManagerImpl(configuration.getMemoryWarningThreshold(),
@@ -1470,6 +1468,20 @@
configuration.isPersistIDCache(),
addressSettingsRepository);
+ // This can't be created until node id is set
+ clusterManager = new ClusterManagerImpl(executorFactory,
+ this,
+ postOffice,
+ scheduledPool,
+ managementService,
+ configuration,
+ nodeManager.getUUID(),
+ configuration.isBackup(),
+ configuration.isClustered());
+
+
+ remotingService = new RemotingServiceImpl(clusterManager, configuration, this, managementService, scheduledPool);
+
messagingServerControl = managementService.registerServer(postOffice,
storageManager,
configuration,
@@ -1527,18 +1539,6 @@
deploySecurityFromConfiguration();
deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
-
- // This can't be created until node id is set
- clusterManager = new ClusterManagerImpl(executorFactory,
- this,
- postOffice,
- scheduledPool,
- managementService,
- configuration,
- nodeManager.getUUID(),
- configuration.isBackup(),
- configuration.isClustered());
-
}
/*
@@ -1604,10 +1604,10 @@
// We do this at the end - we don't want things like MDBs or other connections connecting to a backup server until
// it is activated
+ clusterManager.start();
+
remotingService.start();
- clusterManager.start();
-
initialised = true;
}
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -14,6 +14,7 @@
package org.hornetq.spi.core.protocol;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.Connection;
@@ -26,7 +27,7 @@
*/
public interface ProtocolManager extends BufferDecoder
{
- ConnectionEntry createConnectionEntry(Connection connection);
+ ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection);
public void removeHandler(final String name);
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/remoting/Acceptor.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/remoting/Acceptor.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/remoting/Acceptor.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -14,6 +14,7 @@
package org.hornetq.spi.core.remoting;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.NotificationService;
/**
@@ -31,6 +32,11 @@
void pause();
/**
+ * @return the cluster connection associated with this Acceptor
+ */
+ ClusterConnection getClusterConnection();
+
+ /**
* Set the notification service for this acceptor to use.
*
* @param notificationService the notification service
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -18,6 +18,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.core.server.cluster.ClusterConnection;
+
/**
* A factory for creating acceptors.
* <p/>
@@ -40,7 +42,8 @@
* @param scheduledThreadPool a scheduled thread pool
* @return an acceptor
*/
- Acceptor createAcceptor(final Map<String, Object> configuration,
+ Acceptor createAcceptor(ClusterConnection clusterConnection,
+ final Map<String, Object> configuration,
BufferHandler handler,
BufferDecoder decoder,
ConnectionLifeCycleListener listener,
Modified: branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -23,11 +23,13 @@
public interface ConnectionLifeCycleListener
{
/**
- * called when a connection is created.
+ * This method is used both by client connector creation and server connection creation through acceptors.
+ * the acceptor will be set to null on client operations
*
+ * @param The acceptor here will be always null on a client connection created event.
* @param connection the connection that has been created
*/
- void connectionCreated(Connection connection, ProtocolType protocol);
+ void connectionCreated(Acceptor acceptor, Connection connection, ProtocolType protocol);
/**
* called when a connection is destroyed.
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -940,10 +940,10 @@
for (ClusterConnection cc : clusterManager.getClusterConnections())
{
out += cc.describe() + "\n";
+ out += cc.getTopology().describe();
}
}
out += "\n\nfull topology:";
- out += clusterManager.getTopology().describe();
return out + br;
}
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -131,10 +131,6 @@
waitForTopology(servers[0], 2);
waitForTopology(servers[1], 2);
-
- System.out.println(servers[0].getClusterManager().getTopology().describe());
-
- System.out.println(servers[1].getClusterManager().getTopology().describe());
setupSessionFactory(0, isNetty(), true);
setupSessionFactory(1, isNetty(), true);
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -153,17 +153,6 @@
waitForTopology(servers[1], 3);
waitForTopology(servers[2], 3);
- for (int i = 0 ; i < 3; i++)
- {
- System.out.println("top[" + i + "]=" + servers[i].getClusterManager().getTopology().describe());
- }
-
- for (int i = 0; i <= 2; i++)
- {
- log.info("*************************************\n " + servers[i] +
- " topology:\n" +
- servers[i].getClusterManager().getTopology().describe());
- }
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -196,12 +185,6 @@
startServers(0, 1);
- for (int i = 0; i <= 1; i++)
- {
- log.info("*************************************\n " + servers[i] +
- " topology:\n" +
- servers[i].getClusterManager().getTopology().describe());
- }
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -266,13 +249,6 @@
for (int i = 0; i <= 4; i++)
{
- log.info("*************************************\n " + servers[i] +
- " topology:\n" +
- servers[i].getClusterManager().getTopology().describe());
- }
-
- for (int i = 0; i <= 4; i++)
- {
setupSessionFactory(i, isNetty());
}
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -137,8 +137,6 @@
startServers(0, 1);
waitForTopology(servers[0], 2);
- System.out.println(servers[0].getClusterManager().getTopology().describe());
- System.out.println(servers[1].getClusterManager().getTopology().describe());
waitForTopology(servers[1], 2);
for (int i = 0; i < 10; i++)
@@ -148,7 +146,6 @@
log.info("#stop #test #" + i);
stopServers(1);
- System.out.println(servers[0].getClusterManager().getTopology().describe());
waitForTopology(servers[0], 1, 2000);
log.info("#start #test #" + i);
startServers(1);
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -193,7 +193,7 @@
{
if (server != null)
{
- log.info("failed topology, Topology on server = " + server.getClusterManager().getTopology().describe());
+ log.info("failed topology, Topology on server = " + server.getClusterManager().describe());
}
}
assertTrue("expected " + topologyMembers + " members", ok);
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -98,8 +98,6 @@
Thread.sleep(500);
servers.get(0).crash(session);
- System.out.println("server3 " + servers.get(3).getServer().getClusterManager().getTopology().describe());
-
int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);
ServerLocator locator2 = getServerLocator(3);
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -29,6 +29,7 @@
import org.hornetq.core.remoting.impl.netty.NettyConnector;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -544,7 +545,7 @@
latch = connCreatedLatch;
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
this.connection = connection;
if (latch != null)
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -89,7 +89,7 @@
*/
public MockConnection(final int serverID, final BufferHandler handler, final ConnectionLifeCycleListener listener)
{
- super(serverID, handler, listener, Executors.newSingleThreadExecutor());
+ super(null, serverID, handler, listener, Executors.newSingleThreadExecutor());
}
@Override
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -63,7 +63,7 @@
{
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
}
@@ -74,7 +74,8 @@
};
- Acceptor acceptor = factory.createAcceptor(params,
+ Acceptor acceptor = factory.createAcceptor(null,
+ params,
handler,
null,
listener,
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -28,6 +28,7 @@
import org.hornetq.core.remoting.impl.netty.NettyAcceptor;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -80,7 +81,7 @@
{
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
}
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -24,6 +24,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.remoting.impl.netty.NettyConnection;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.tests.util.RandomUtil;
@@ -220,7 +221,7 @@
class MyListener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
}
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -23,6 +23,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.remoting.impl.netty.NettyConnector;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -66,7 +67,7 @@
{
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
}
public void connectionReadyForWrites(Object connectionID, boolean ready)
@@ -106,7 +107,7 @@
{
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
}
Modified: branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-17 02:28:00 UTC (rev 11357)
+++ branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-17 02:31:55 UTC (rev 11358)
@@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.management.MBeanServer;
@@ -42,6 +43,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -112,8 +114,15 @@
log.debug("waiting for " + nodes + " on the topology for server = " + server);
long start = System.currentTimeMillis();
+
+ Set<ClusterConnection> ccs = server.getClusterManager().getClusterConnections();
+
+ if (ccs.size() != 1)
+ {
+ throw new IllegalStateException("You need a single cluster connection on this version of waitForTopology on ServiceTestBase");
+ }
- Topology topology = server.getClusterManager().getTopology();
+ Topology topology = ccs.iterator().next().getTopology();
do
{
13 years, 3 months
JBoss hornetq SVN: r11357 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-16 22:28:00 -0400 (Fri, 16 Sep 2011)
New Revision: 11357
Added:
branches/Branch_2_2_EAP_cluster4/
Log:
new branch
13 years, 3 months
JBoss hornetq SVN: r11356 - in branches/STOMP11: tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11 and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-16 11:45:09 -0400 (Fri, 16 Sep 2011)
New Revision: 11356
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-16 14:39:57 UTC (rev 11355)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-16 15:45:09 UTC (rev 11356)
@@ -42,6 +42,7 @@
public HornetQStompException(String msg, Throwable t)
{
super(msg, t);
+ this.body = t.getMessage();
}
public HornetQStompException(Throwable t)
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-16 14:39:57 UTC (rev 11355)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-16 15:45:09 UTC (rev 11356)
@@ -140,7 +140,12 @@
public void acknowledge(String messageID, String subscriptionID) throws Exception
{
long id = Long.parseLong(messageID);
- long consumerID = messagesToAck.remove(id);
+ Long consumerID = messagesToAck.remove(id);
+
+ if (consumerID == null)
+ {
+ throw new HornetQStompException("failed to ack because no message with id: " + id);
+ }
StompSubscription sub = subscriptions.get(consumerID);
if (subscriptionID != null)
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-16 14:39:57 UTC (rev 11355)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-16 15:45:09 UTC (rev 11356)
@@ -104,6 +104,13 @@
log.error("---------------postprocessed response: " + response);
}
+ else
+ {
+ if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ response.addHeader(Stomp.Headers.Response.RECEIPT_ID, request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+ }
+ }
return response;
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-16 14:39:57 UTC (rev 11355)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-16 15:45:09 UTC (rev 11356)
@@ -690,7 +690,78 @@
Assert.assertNotNull(message);
}
+ public void testErrorWithReceipt() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ClientStompFrame ackFrame = connV11.createFrame("ACK");
+ //give it a wrong sub id
+ ackFrame.addHeader("subscription", "sub2");
+ ackFrame.addHeader("message-id", messageID);
+ ackFrame.addHeader("receipt", "answer-me");
+
+ ClientStompFrame error = connV11.sendFrame(ackFrame);
+
+ System.out.println("Receiver error: " + error);
+
+ assertEquals("ERROR", error.getCommand());
+
+ assertEquals("answer-me", error.getHeader("receipt-id"));
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testErrorWithReceipt2() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ClientStompFrame ackFrame = connV11.createFrame("ACK");
+ //give it a wrong sub id
+ ackFrame.addHeader("subscription", "sub1");
+ ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) + 1));
+ ackFrame.addHeader("receipt", "answer-me");
+
+ ClientStompFrame error = connV11.sendFrame(ackFrame);
+
+ System.out.println("Receiver error: " + error);
+
+ assertEquals("ERROR", error.getCommand());
+
+ assertEquals("answer-me", error.getHeader("receipt-id"));
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
private void ack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
{
ClientStompFrame ackFrame = conn.createFrame("ACK");
13 years, 3 months
JBoss hornetq SVN: r11355 - branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-16 10:39:57 -0400 (Fri, 16 Sep 2011)
New Revision: 11355
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-16 10:28:52 UTC (rev 11354)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-16 14:39:57 UTC (rev 11355)
@@ -608,7 +608,98 @@
Message message = consumer.receive(1000);
Assert.assertNotNull(message);
}
+
+
+ public void testAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ack(connV11, "sub1", messageID);
+
+ connV11.disconnect();
+
+ //Nack makes the message be dropped.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckWithWrongSubId() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ack(connV11, "sub2", messageID);
+
+ ClientStompFrame error = connV11.receiveFrame();
+
+ System.out.println("Receiver error: " + error);
+
+ connV11.disconnect();
+
+ //message should be still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testAckWithWrongMessageId() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ack(connV11, "sub2", "someother");
+
+ ClientStompFrame error = connV11.receiveFrame();
+
+ System.out.println("Receiver error: " + error);
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+
+ private void ack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
+ {
+ ClientStompFrame ackFrame = conn.createFrame("ACK");
+ ackFrame.addHeader("subscription", subId);
+ ackFrame.addHeader("message-id", mid);
+
+ conn.sendFrame(ackFrame);
+ }
+
private void nack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
{
ClientStompFrame ackFrame = conn.createFrame("NACK");
13 years, 3 months
JBoss hornetq SVN: r11354 - branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-16 06:28:52 -0400 (Fri, 16 Sep 2011)
New Revision: 11354
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
more test
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-16 06:00:24 UTC (rev 11353)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-16 10:28:52 UTC (rev 11354)
@@ -19,6 +19,11 @@
import java.io.IOException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+
+import junit.framework.Assert;
+
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
import org.hornetq.tests.integration.stomp.util.StompClientConnection;
@@ -466,7 +471,6 @@
public void testHeartBeat2() throws Exception
{
//heart-beat (1,1)
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
ClientStompFrame frame = connV11.createFrame("CONNECT");
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
@@ -524,7 +528,106 @@
connV11.disconnect();
}
+
+ public void testNack() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ nack(connV11, "sub1", messageID);
+
+ connV11.disconnect();
+
+ //Nack makes the message be dropped.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testNackWithWrongSubId() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ nack(connV11, "sub2", messageID);
+
+ ClientStompFrame error = connV11.receiveFrame();
+
+ System.out.println("Receiver error: " + error);
+
+ connV11.disconnect();
+
+ //message should be still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testNackWithWrongMessageId() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ nack(connV11, "sub2", "someother");
+
+ ClientStompFrame error = connV11.receiveFrame();
+
+ System.out.println("Receiver error: " + error);
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ private void nack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
+ {
+ ClientStompFrame ackFrame = conn.createFrame("NACK");
+ ackFrame.addHeader("subscription", subId);
+ ackFrame.addHeader("message-id", mid);
+
+ conn.sendFrame(ackFrame);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId, String ack) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", subId);
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", ack);
+
+ conn.sendFrame(subFrame);
+ }
+
}
13 years, 3 months
JBoss hornetq SVN: r11353 - in branches/STOMP11: tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-16 02:00:24 -0400 (Fri, 16 Sep 2011)
New Revision: 11353
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-16 05:20:30 UTC (rev 11352)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-16 06:00:24 UTC (rev 11353)
@@ -493,7 +493,7 @@
return frame;
}
- //server heart beat (20,100) (hard coded)
+ //server heart beat
//algorithm:
//(a) server ping: if server hasn't sent any frame within serverPing
//interval, send a ping.
@@ -501,7 +501,7 @@
// 2*serverAcceptPing, disconnect!
private class HeartBeater extends Thread
{
- final int MIN_SERVER_PING = 200;
+ final int MIN_SERVER_PING = 500;
final int MIN_CLIENT_PING = 500;
long serverPing = 0;
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-16 05:20:30 UTC (rev 11352)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-16 06:00:24 UTC (rev 11353)
@@ -94,6 +94,19 @@
if (frame.needsReply())
{
response = receiveFrame();
+
+ //filter out server ping
+ while (response != null)
+ {
+ if (response.getCommand().equals("STOMP"))
+ {
+ response = receiveFrame();
+ }
+ else
+ {
+ break;
+ }
+ }
}
return response;
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-16 05:20:30 UTC (rev 11352)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-16 06:00:24 UTC (rev 11353)
@@ -462,6 +462,69 @@
}
+ //server ping
+ public void testHeartBeat2() throws Exception
+ {
+ //heart-beat (1,1)
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "1,1");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ ClientStompFrame reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("500,500", reply.getHeader("heart-beat"));
+
+ connV11.disconnect();
+
+ //heart-beat (500,1000)
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("1000,500", reply.getHeader("heart-beat"));
+
+ System.out.println("========== start pinger!");
+
+ connV11.startPinger(500);
+
+ Thread.sleep(10000);
+
+ //now check the frame size
+ int size = connV11.getFrameQueueSize();
+
+ System.out.println("ping received: " + size);
+
+ assertTrue(size > 5);
+
+ //now server side should be disconnected because we didn't send ping for 2 sec
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World");
+
+ //send will be ok
+ connV11.sendFrame(frame);
+
+ connV11.stopPinger();
+
+ connV11.disconnect();
+
+ }
+
}
13 years, 3 months
JBoss hornetq SVN: r11352 - in branches/STOMP11: tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util and 1 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-16 01:20:30 -0400 (Fri, 16 Sep 2011)
New Revision: 11352
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-16 05:20:30 UTC (rev 11352)
@@ -99,7 +99,14 @@
if (heartBeat != null)
{
handleHeartBeat(heartBeat);
- response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "20,100");
+ if (heartBeater == null)
+ {
+ response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "0,0");
+ }
+ else
+ {
+ response.addHeader(Stomp.Headers.Connected.HEART_BEAT, heartBeater.getServerHeartBeatValue());
+ }
}
}
else
@@ -147,7 +154,18 @@
public StompFrame onDisconnect(StompFrame frame)
{
log.error("----------------- frame: " + frame);
-
+ if (this.heartBeater != null)
+ {
+ heartBeater.shutdown();
+ try
+ {
+ heartBeater.join();
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Interrupted while waiting for heart beater to die", e);
+ }
+ }
return null;
}
@@ -371,7 +389,11 @@
@Override
public StompFrame onStomp(StompFrame request)
{
- return onConnect(request);
+ if (!connection.isValid())
+ {
+ return onConnect(request);
+ }
+ return null;
}
@Override
@@ -479,6 +501,9 @@
// 2*serverAcceptPing, disconnect!
private class HeartBeater extends Thread
{
+ final int MIN_SERVER_PING = 200;
+ final int MIN_CLIENT_PING = 500;
+
long serverPing = 0;
long serverAcceptPing = 0;
volatile boolean shutdown = false;
@@ -490,15 +515,26 @@
{
if (clientPing != 0)
{
- serverAcceptPing = clientPing > 100 ? clientPing : 100;
+ serverAcceptPing = clientPing > MIN_CLIENT_PING ? clientPing : MIN_CLIENT_PING;
}
if (clientAcceptPing != 0)
{
- serverPing = clientAcceptPing > 20 ? clientAcceptPing : 20;
+ serverPing = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING;
}
}
+ public synchronized void shutdown()
+ {
+ shutdown = true;
+ this.notify();
+ }
+
+ public String getServerHeartBeatValue()
+ {
+ return String.valueOf(serverPing) + "," + String.valueOf(serverAcceptPing);
+ }
+
public void pinged()
{
lastPingTime.set(System.currentTimeMillis());
@@ -537,6 +573,9 @@
if (serverAcceptPing != 0)
{
dur2 = System.currentTimeMillis() - lastAccepted.get();
+
+ log.error("-------------------------- dur2 is " + dur2);
+
if (dur2 > (2 * serverAcceptPing))
{
connection.disconnect();
@@ -545,24 +584,51 @@
}
}
- long waitTime1 = serverPing - dur1;
- long waitTime2 = serverAcceptPing * 2 - dur2;
-
- long waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
+ long waitTime1 = 0;
+ long waitTime2 = 0;
+ if (serverPing > 0)
+ {
+ waitTime1 = serverPing - dur1;
+ }
+
+ if (serverAcceptPing > 0)
+ {
+ waitTime2 = serverAcceptPing * 2 - dur2;
+ }
+
+ long waitTime = 10l;
+
+ if ((waitTime1 > 0) && (waitTime1 > 0))
+ {
+ waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
+ }
+ else if (waitTime1 > 0)
+ {
+ waitTime = waitTime1;
+ }
+ else if (waitTime2 > 0)
+ {
+ waitTime = waitTime2;
+ }
+
try
{
+ log.error("-------------------waiting for " + waitTime);
this.wait(waitTime);
+ log.error("--------------------wake up " );
}
catch (InterruptedException e)
{
}
}
+ log.error("-------------------------HeartBeat thread shut down!");
}
}
public void pingAccepted()
{
+ log.error("------------------------Ping accepted!");
this.lastAccepted.set(System.currentTimeMillis());
}
}
@@ -572,6 +638,7 @@
{
if (heartBeater != null)
{
+ log.error("----------------------PING accepted: " + request);
heartBeater.pingAccepted();
}
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-16 05:20:30 UTC (rev 11352)
@@ -195,5 +195,10 @@
{
return version;
}
+
+ public int getFrameQueueSize()
+ {
+ return this.frameQueue.size();
+ }
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java 2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java 2011-09-16 05:20:30 UTC (rev 11352)
@@ -20,14 +20,23 @@
*/
public class ClientStompFrameV11 extends AbstractClientStompFrame
{
+ boolean forceOneway = false;
+
public ClientStompFrameV11(String command)
{
super(command);
}
+
+ public void setForceOneway()
+ {
+ forceOneway = true;
+ }
@Override
public boolean needsReply()
{
+ if (forceOneway) return false;
+
if ("CONNECT".equals(command) || "STOMP".equals(command) || headerKeys.contains(HEADER_RECEIPT))
{
return true;
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-16 05:20:30 UTC (rev 11352)
@@ -37,6 +37,13 @@
String getVersion();
ClientStompFrame createFrame(String command);
+
+ //number of frames at the queue
+ int getFrameQueueSize();
+
+ void startPinger(long interval);
+
+ void stopPinger();
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-16 05:20:30 UTC (rev 11352)
@@ -65,4 +65,18 @@
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public void startPinger(long interval)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void stopPinger()
+ {
+ // TODO Auto-generated method stub
+
+ }
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-16 05:20:30 UTC (rev 11352)
@@ -27,6 +27,8 @@
public static final String HOST_HEADER = "host";
public static final String VERSION_HEADER = "version";
public static final String RECEIPT_HEADER = "receipt";
+
+ private Pinger pinger;
public StompClientConnectionV11(String host, int port) throws IOException
{
@@ -93,6 +95,7 @@
@Override
public void disconnect() throws IOException, InterruptedException
{
+ stopPinger();
ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
frame.addHeader("receipt", "1");
@@ -114,4 +117,81 @@
return new ClientStompFrameV11(command);
}
+ @Override
+ public void startPinger(long interval)
+ {
+ pinger = new Pinger(interval);
+ pinger.startPing();
+ }
+
+ @Override
+ public void stopPinger()
+ {
+ if (pinger != null)
+ {
+ pinger.stopPing();
+ try
+ {
+ pinger.join();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ pinger = null;
+ }
+ }
+
+ private class Pinger extends Thread
+ {
+ long pingInterval;
+ ClientStompFrameV11 pingFrame;
+ volatile boolean stop = false;
+
+ public Pinger(long interval)
+ {
+ this.pingInterval = interval;
+ pingFrame = (ClientStompFrameV11) createFrame("STOMP");
+ pingFrame.setBody("\n");
+ pingFrame.setForceOneway();
+ }
+
+ public void startPing()
+ {
+ start();
+ }
+
+ public synchronized void stopPing()
+ {
+ stop = true;
+ this.notify();
+ }
+
+ public void run()
+ {
+ synchronized (this)
+ {
+ while (!stop)
+ {
+ try
+ {
+ System.out.println("============sending ping");
+
+ sendFrame(pingFrame);
+
+ System.out.println("Pinged " + pingFrame);
+
+ this.wait(pingInterval);
+ }
+ catch (Exception e)
+ {
+ stop = true;
+ e.printStackTrace();
+ }
+ }
+ System.out.println("Pinger stopped");
+ }
+ }
+ }
+
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-16 05:20:30 UTC (rev 11352)
@@ -17,6 +17,8 @@
*/
package org.hornetq.tests.integration.stomp.v11;
+import java.io.IOException;
+
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
import org.hornetq.tests.integration.stomp.util.StompClientConnection;
@@ -350,6 +352,116 @@
newConn.disconnect();
}
+
+ public void testHeartBeat() throws Exception
+ {
+ //no heart beat at all if heat-beat absent
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ ClientStompFrame reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ Thread.sleep(5000);
+
+ assertEquals(0, connV11.getFrameQueueSize());
+
+ connV11.disconnect();
+
+ //no heart beat for (0,0)
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "0,0");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("0,0", reply.getHeader("heart-beat"));
+
+ Thread.sleep(5000);
+
+ assertEquals(0, connV11.getFrameQueueSize());
+
+ connV11.disconnect();
+
+ //heart-beat (1,0), should receive a min client ping accepted by server
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "1,0");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("0,500", reply.getHeader("heart-beat"));
+
+ Thread.sleep(2000);
+
+ //now server side should be disconnected because we didn't send ping for 2 sec
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World");
+
+ //send will fail
+ try
+ {
+ connV11.sendFrame(frame);
+ fail("connection should have been destroyed by now");
+ }
+ catch (IOException e)
+ {
+ //ignore
+ }
+
+ //heart-beat (1,0), start a ping, then send a message, should be ok.
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "1,0");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("0,500", reply.getHeader("heart-beat"));
+
+ System.out.println("========== start pinger!");
+
+ connV11.startPinger(500);
+
+ Thread.sleep(2000);
+
+ //now server side should be disconnected because we didn't send ping for 2 sec
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World");
+
+ //send will be ok
+ connV11.sendFrame(frame);
+
+ connV11.stopPinger();
+
+ connV11.disconnect();
+
+ }
+
}
13 years, 3 months
JBoss hornetq SVN: r11351 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10 and 3 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-15 10:34:00 -0400 (Thu, 15 Sep 2011)
New Revision: 11351
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-15 14:34:00 UTC (rev 11351)
@@ -27,6 +27,7 @@
private List<Header> headers = new ArrayList<Header>(10);
private String body;
private VersionedStompFrameHandler handler;
+ private boolean disconnect;
public HornetQStompException(StompConnection connection, String msg)
{
@@ -85,6 +86,7 @@
frame = handler.createStompFrame("ERROR");
frame.addHeader("message", this.getMessage());
}
+ frame.setNeedsDisconnect(disconnect);
return frame;
}
@@ -99,4 +101,9 @@
this.val = val;
}
}
+
+ public void setDisconnect(boolean b)
+ {
+ disconnect = b;
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-15 14:34:00 UTC (rev 11351)
@@ -371,8 +371,6 @@
{
String acceptVersion = frame.getHeader(Stomp.Headers.ACCEPT_VERSION);
- log.error("----------------- acceptVersion: " + acceptVersion);
-
if (acceptVersion == null)
{
this.version = StompVersions.V1_0;
@@ -401,6 +399,7 @@
error.addHeader("version", acceptVersion);
error.addHeader("content-type", "text/plain");
error.setBody("Supported protocol version are " + manager.getSupportedVersionsAsString());
+ error.setDisconnect(true);
throw error;
}
log.error("------------------ negotiated version is " + this.version);
@@ -438,11 +437,11 @@
stompListener.requestAccepted(request);
}
+ String cmd = request.getCommand();
try
{
if (!initialized)
{
- String cmd = request.getCommand();
if ( ! (Stomp.Commands.CONNECT.equals(cmd) || Stomp.Commands.STOMP.equals(cmd)))
{
throw new HornetQStompException("Connection hasn't been established.");
@@ -461,6 +460,11 @@
{
sendFrame(reply);
}
+
+ if (Stomp.Commands.DISCONNECT.equals(cmd))
+ {
+ this.disconnect();
+ }
}
public void sendFrame(StompFrame frame)
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-15 14:34:00 UTC (rev 11351)
@@ -115,6 +115,8 @@
public static final byte U = (byte)'U';
public static final byte N = (byte)'N';
+
+ public static final byte LN = (byte)'n';
public static final byte HEADER_SEPARATOR = (byte)':';
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-15 14:34:00 UTC (rev 11351)
@@ -177,11 +177,15 @@
public String getEscapedKey()
{
+ log.error("----------------key is : |" + key + "|");
+ log.error("----------------esc'd: |" + escape(key) + "|");
return escape(key);
}
public String getEscapedValue()
{
+ log.error("----------------val is : |" + val + "|");
+ log.error("----------------esc'd v: |" + escape(val) + "|");
return escape(val);
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-15 14:34:00 UTC (rev 11351)
@@ -134,10 +134,7 @@
return receipt;
}
- public StompFrame postprocess(StompFrame request)
- {
- return null;
- }
+ public abstract StompFrame postprocess(StompFrame request);
public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
StompSubscription subscription, int deliveryCount) throws Exception;
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-15 14:34:00 UTC (rev 11351)
@@ -104,7 +104,6 @@
@Override
public StompFrame onDisconnect(StompFrame frame)
{
- connection.destroy();
return null;
}
@@ -398,5 +397,20 @@
// TODO Auto-generated method stub
}
+
+ @Override
+ public StompFrame postprocess(StompFrame request)
+ {
+ StompFrame response = null;
+ if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+ if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+ {
+ response.setNeedsDisconnect(true);
+ }
+ }
+ return response;
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-15 14:34:00 UTC (rev 11351)
@@ -387,7 +387,7 @@
StompSubscription subscription, int deliveryCount)
throws Exception
{
- StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE);
+ StompFrame frame = new StompFrameV11(Stomp.Responses.MESSAGE);
if (subscription.getID() != null)
{
@@ -403,7 +403,7 @@
byte[] data = new byte[size];
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
{
- frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
+ frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length > 0 ? (data.length - 1) : data.length));
buffer.readBytes(data);
}
else
@@ -417,7 +417,6 @@
{
data = new byte[0];
}
- frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
}
frame.setByteBody(data);
@@ -426,6 +425,8 @@
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+ log.error("-------------------- frame created: " + frame);
+
return frame;
}
@@ -888,6 +889,19 @@
break;
}
+ case StompDecoder.LN:
+ {
+ if (isEscaping)
+ {
+ holder.append(StompDecoder.NEW_LINE);
+ isEscaping = false;
+ }
+ else
+ {
+ holder.append(b);
+ }
+ break;
+ }
case StompDecoder.NEW_LINE:
{
if (decoder.whiteSpaceOnly)
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-15 14:34:00 UTC (rev 11351)
@@ -71,6 +71,8 @@
}
// Add a newline to separate the headers from the content.
head.append(Stomp.NEWLINE);
+
+ log.error("------------------------_______now head: " + head);
buffer.writeBytes(head.toString().getBytes("UTF-8"));
if (bytesBody != null)
@@ -91,8 +93,12 @@
if (!headers.containsKey(key))
{
headers.put(key, val);
+ allHeaders.add(new Header(key, val));
}
- allHeaders.add(new Header(key, val));
+ else if (!key.equals(Stomp.Headers.CONTENT_LENGTH))
+ {
+ allHeaders.add(new Header(key, val));
+ }
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-15 14:34:00 UTC (rev 11351)
@@ -94,9 +94,15 @@
public void disconnect() throws IOException, InterruptedException
{
ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
+ frame.addHeader("receipt", "1");
ClientStompFrame result = this.sendFrame(frame);
+ if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
+ {
+ throw new IOException("Disconnect failed! " + result);
+ }
+
close();
connected = false;
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java 2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java 2011-09-15 14:34:00 UTC (rev 11351)
@@ -12,8 +12,12 @@
*/
package org.hornetq.tests.integration.stomp.util;
+import java.io.UnsupportedEncodingException;
import java.util.StringTokenizer;
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.StompDecoder;
+
/**
*
* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
@@ -40,11 +44,17 @@
@Override
- public ClientStompFrame createFrame(String data)
+ public ClientStompFrame createFrame(final String data)
{
+ System.out.println("Data: |" + data + "|");
//split the string at "\n\n"
String[] dataFields = data.split("\n\n");
+ System.out.println("DataFields[0] |" + dataFields[0]);
+ if (dataFields.length > 1)
+ {
+ System.out.println("DataFields[1] |" + dataFields[1]);
+ }
StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
String command = tokenizer.nextToken();
@@ -53,7 +63,8 @@
while (tokenizer.hasMoreTokens())
{
String header = tokenizer.nextToken();
- String[] fields = header.split(":");
+ System.out.println("header is: " + header);
+ String[] fields = splitHeader(header);
frame.addHeader(fields[0], fields[1]);
}
@@ -64,7 +75,111 @@
}
return frame;
}
+
+ //find true :
+ private String[] splitHeader(String header)
+ {
+ StringBuffer sbKey = new StringBuffer();
+ StringBuffer sbVal = new StringBuffer();
+ boolean isEsc = false;
+ boolean isKey = true;
+
+ for (int i = 0; i < header.length(); i++)
+ {
+ char b = header.charAt(i);
+ switch (b)
+ {
+ //escaping
+ case '\\':
+ {
+ if (isEsc)
+ {
+ //this is a backslash
+ if (isKey)
+ {
+ sbKey.append(b);
+ }
+ else
+ {
+ sbVal.append(b);
+ }
+ isEsc = false;
+ }
+ else
+ {
+ //begin escaping
+ isEsc = true;
+ }
+ break;
+ }
+ case ':':
+ {
+ if (isEsc)
+ {
+ if (isKey)
+ {
+ sbKey.append(b);
+ }
+ else
+ {
+ sbVal.append(b);
+ }
+ isEsc = false;
+ }
+ else
+ {
+ isKey = false;
+ }
+ break;
+ }
+ case 'n':
+ {
+ if (isEsc)
+ {
+ if (isKey)
+ {
+ sbKey.append('\n');
+ }
+ else
+ {
+ sbVal.append('\n');
+ }
+ isEsc = false;
+ }
+ else
+ {
+ if (isKey)
+ {
+ sbKey.append(b);
+ }
+ else
+ {
+ sbVal.append(b);
+ }
+ }
+ break;
+ }
+ default:
+ {
+ if (isKey)
+ {
+ sbKey.append(b);
+ }
+ else
+ {
+ sbVal.append(b);
+ }
+ }
+ }
+ }
+ String[] result = new String[2];
+ result[0] = sbKey.toString();
+ result[1] = sbVal.toString();
+
+ return result;
+ }
+
@Override
public ClientStompFrame newFrame(String command)
{
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-15 14:34:00 UTC (rev 11351)
@@ -96,7 +96,7 @@
//reply headers: version, session, server
assertEquals(null, reply.getHeader("version"));
-
+
connV11.disconnect();
// case 2 accept-version=1.0, result: 1.0
@@ -164,7 +164,6 @@
System.out.println("Got error frame " + reply);
- connV11.disconnect();
}
public void testSendAndReceive() throws Exception
@@ -303,6 +302,54 @@
newConn.disconnect();
}
+ public void testHeaderEncoding() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ String body = "Hello World 1!";
+ String cLen = String.valueOf(body.getBytes("UTF-8").length);
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "application/xml");
+ frame.addHeader("content-length", cLen);
+ String hKey = "special-header\\\\\\n\\:";
+ String hVal = "\\:\\\\\\ngood";
+ frame.addHeader(hKey, hVal);
+
+ System.out.println("key: |" + hKey + "| val: |" + hVal);
+
+ frame.setBody(body);
+
+ connV11.sendFrame(frame);
+
+ //subscribe
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ String value = frame.getHeader("special-header" + "\\" + "\n" + ":");
+
+ assertEquals(":" + "\\" + "\n" + "good", value);
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
+ }
}
13 years, 3 months