Author: ataylor
Date: 2011-06-14 11:05:40 -0400 (Tue, 14 Jun 2011)
New Revision: 10802
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/Bridge.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
bridge, added pause method
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2011-06-14
14:05:34 UTC (rev 10801)
+++
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/api/core/client/ClientSessionFactory.java 2011-06-14
15:05:40 UTC (rev 10802)
@@ -137,4 +137,6 @@
ServerLocator getServerLocator();
CoreRemotingConnection getConnection();
+
+ boolean isClosed();
}
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-14
14:05:34 UTC (rev 10801)
+++
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-14
15:05:40 UTC (rev 10802)
@@ -451,7 +451,12 @@
closed = true;
}
- public ServerLocator getServerLocator()
+ public boolean isClosed()
+ {
+ return closed;
+ }
+
+ public ServerLocator getServerLocator()
{
return serverLocator;
}
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-06-14
14:05:34 UTC (rev 10801)
+++
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-06-14
15:05:40 UTC (rev 10802)
@@ -57,4 +57,8 @@
void setNotificationService(NotificationService notificationService);
RemotingConnection getForwardingConnection();
+
+ void pause() throws Exception;
+
+ void resume() throws Exception;
}
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2011-06-14
14:05:34 UTC (rev 10801)
+++
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2011-06-14
15:05:40 UTC (rev 10802)
@@ -40,4 +40,7 @@
void reset() throws Exception;
+ void pause() throws Exception;
+
+ boolean isPaused();
}
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-14
14:05:34 UTC (rev 10801)
+++
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-14
15:05:40 UTC (rev 10802)
@@ -291,8 +291,36 @@
}
}
- public boolean isStarted()
+ public void pause() throws Exception
{
+ log.info("Bridge " + this.name + " being paused");
+
+ executor.execute(new PauseRunnable());
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), name);
+ Notification notification = new Notification(nodeUUID.toString(),
NotificationType.BRIDGE_STOPPED, props);
+ try
+ {
+ notificationService.sendNotification(notification);
+ }
+ catch (Exception e)
+ {
+ BridgeImpl.log.warn("unable to send notification when broadcast group is
stopped", e);
+ }
+ }
+ }
+
+ public void resume() throws Exception
+ {
+ queue.addConsumer(BridgeImpl.this);
+ queue.deliverAsync();
+ }
+
+ public boolean isStarted()
+ {
return started;
}
@@ -596,21 +624,21 @@
try
{
- csf = createSessionFactory();
- // Session is pre-acknowledge
- session = (ClientSessionInternal)csf.createSession(user, password, false,
true, true, true, 1);
-
- try
+ if (csf == null || csf.isClosed())
{
- session.addMetaData("Session-for-bridge", name.toString());
- session.addMetaData("nodeUUID", nodeUUID.toString());
+ csf = createSessionFactory();
+ // Session is pre-acknowledge
+ session = (ClientSessionInternal)csf.createSession(user, password, false,
true, true, true, 1);try
+ {
+ session.addMetaData("Session-for-bridge", name.toString());
+ session.addMetaData("nodeUUID", nodeUUID.toString());
+ }
+ catch (Throwable dontCare)
+ {
+ // addMetaData here is just for debug purposes
+ }
}
- catch (Throwable dontCare)
- {
- // addMetaData here is just for debug purposes
- }
-
if (forwardingAddress != null)
{
BindingQuery query = null;
@@ -778,6 +806,40 @@
}
}
+ private class PauseRunnable implements Runnable
+ {
+ public void run()
+ {
+ try
+ {
+ synchronized (BridgeImpl.this)
+ {
+ log.debug("Closing Session for bridge " +
BridgeImpl.this.name);
+
+ started = false;
+
+ active = false;
+
+ }
+
+ queue.removeConsumer(BridgeImpl.this);
+
+ cancelRefs();
+
+ if (queue != null)
+ {
+ queue.deliverAsync();
+ }
+
+ log.info("paused bridge " + name);
+ }
+ catch (Exception e)
+ {
+ BridgeImpl.log.error("Failed to pause bridge", e);
+ }
+ }
+ }
+
private class CreateObjectsRunnable implements Runnable
{
public synchronized void run()
Modified:
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-14
14:05:34 UTC (rev 10801)
+++
branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-14
15:05:40 UTC (rev 10802)
@@ -466,7 +466,7 @@
{
log.trace("Closing clustering record " + record);
}
- record.close();
+ record.pause();
}
catch (Exception e)
{
@@ -545,7 +545,7 @@
else
{
log.info("Reattaching nodeID=" + nodeID);
- if (record.isClosed())
+ if (record.isPaused())
{
record.resume();
}
@@ -714,6 +714,8 @@
private volatile boolean isClosed = false;
+ private volatile boolean paused = false;
+
private volatile boolean firstReset = false;
public MessageFlowRecordImpl(final String nodeID,
@@ -781,13 +783,24 @@
bridge.stop();
}
-
- public void resume() throws Exception
+
+ public void pause() throws Exception
{
- isClosed = false;
- this.bridge = createBridge(this);
- bridge.start();
+ paused = true;
+ clearBindings();
+ bridge.pause();
}
+
+ public boolean isPaused()
+ {
+ return paused;
+ }
+
+ public void resume() throws Exception
+ {
+ paused = false;
+ bridge.resume();
+ }
public boolean isClosed()
{
@@ -799,7 +812,8 @@
clearBindings();
}
- public void setBridge(final Bridge bridge)
+
+ public void setBridge(final Bridge bridge)
{
this.bridge = bridge;
}