[hornetq-commits] JBoss hornetq SVN: r10802 - in branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq: core/client/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jun 14 11:05:40 EDT 2011


Author: ataylor
Date: 2011-06-14 11:05:40 -0400 (Tue, 14 Jun 2011)
New Revision: 10802

Modified:
   branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
   branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/Bridge.java
   branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
   branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
bridge, added pause method

Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/api/core/client/ClientSessionFactory.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/api/core/client/ClientSessionFactory.java	2011-06-14 14:05:34 UTC (rev 10801)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/api/core/client/ClientSessionFactory.java	2011-06-14 15:05:40 UTC (rev 10802)
@@ -137,4 +137,6 @@
    ServerLocator getServerLocator();
    
    CoreRemotingConnection getConnection();
+
+    boolean isClosed();
 }

Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-06-14 14:05:34 UTC (rev 10801)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-06-14 15:05:40 UTC (rev 10802)
@@ -451,7 +451,12 @@
       closed = true;
    }
 
-   public ServerLocator getServerLocator()
+    public boolean isClosed()
+    {
+        return closed;
+    }
+
+    public ServerLocator getServerLocator()
    {
       return serverLocator;
    }

Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/Bridge.java	2011-06-14 14:05:34 UTC (rev 10801)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/Bridge.java	2011-06-14 15:05:40 UTC (rev 10802)
@@ -57,4 +57,8 @@
    void setNotificationService(NotificationService notificationService);
 
    RemotingConnection getForwardingConnection();
+
+   void pause() throws Exception;
+
+   void resume() throws Exception;
 }

Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java	2011-06-14 14:05:34 UTC (rev 10801)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java	2011-06-14 15:05:40 UTC (rev 10802)
@@ -40,4 +40,7 @@
 
    void reset() throws Exception;
 
+   void pause() throws Exception;
+
+    boolean isPaused();
 }

Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-14 14:05:34 UTC (rev 10801)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-14 15:05:40 UTC (rev 10802)
@@ -291,8 +291,36 @@
       }
    }
 
-   public boolean isStarted()
+   public void pause() throws Exception
    {
+      log.info("Bridge " + this.name + " being paused");
+
+      executor.execute(new PauseRunnable());
+
+      if (notificationService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putSimpleStringProperty(new SimpleString("name"), name);
+         Notification notification = new Notification(nodeUUID.toString(), NotificationType.BRIDGE_STOPPED, props);
+         try
+         {
+            notificationService.sendNotification(notification);
+         }
+         catch (Exception e)
+         {
+            BridgeImpl.log.warn("unable to send notification when broadcast group is stopped", e);
+         }
+      }
+   }
+
+    public void resume() throws Exception
+    {
+        queue.addConsumer(BridgeImpl.this);
+        queue.deliverAsync();
+    }
+
+    public boolean isStarted()
+   {
       return started;
    }
 
@@ -596,21 +624,21 @@
 
          try
          {
-            csf = createSessionFactory();
-            // Session is pre-acknowledge
-            session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
-            
-            try
+            if (csf == null || csf.isClosed())
             {
-               session.addMetaData("Session-for-bridge", name.toString());
-               session.addMetaData("nodeUUID", nodeUUID.toString());
+                csf = createSessionFactory();
+                // Session is pre-acknowledge
+                session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);try
+                {
+                   session.addMetaData("Session-for-bridge", name.toString());
+                   session.addMetaData("nodeUUID", nodeUUID.toString());
+                }
+                catch (Throwable dontCare)
+                {
+                   // addMetaData here is just for debug purposes
+                }
             }
-            catch (Throwable dontCare)
-            {
-               // addMetaData here is just for debug purposes
-            }
 
-
             if (forwardingAddress != null)
             {
                BindingQuery query = null;
@@ -778,6 +806,40 @@
       }
    }
 
+   private class PauseRunnable implements Runnable
+   {
+      public void run()
+      {
+         try
+         {
+            synchronized (BridgeImpl.this)
+            {
+               log.debug("Closing Session for bridge " + BridgeImpl.this.name);
+
+               started = false;
+
+               active = false;
+
+            }
+
+            queue.removeConsumer(BridgeImpl.this);
+
+            cancelRefs();
+
+            if (queue != null)
+            {
+               queue.deliverAsync();
+            }
+
+            log.info("paused bridge " + name);
+         }
+         catch (Exception e)
+         {
+            BridgeImpl.log.error("Failed to pause bridge", e);
+         }
+      }
+   }
+
    private class CreateObjectsRunnable implements Runnable
    {
       public synchronized void run()

Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-06-14 14:05:34 UTC (rev 10801)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-06-14 15:05:40 UTC (rev 10802)
@@ -466,7 +466,7 @@
             {
                log.trace("Closing clustering record " + record);
             }
-            record.close();
+            record.pause();
          }
          catch (Exception e)
          {
@@ -545,7 +545,7 @@
             else
             {
                log.info("Reattaching nodeID=" + nodeID);
-               if (record.isClosed())
+               if (record.isPaused())
                {
                   record.resume();
                }
@@ -714,6 +714,8 @@
       
       private volatile boolean isClosed = false;
 
+      private volatile boolean paused = false;
+
       private volatile boolean firstReset = false;
 
       public MessageFlowRecordImpl(final String nodeID,
@@ -781,13 +783,24 @@
          
          bridge.stop();
       }
-      
-      public void resume() throws Exception
+
+      public void pause() throws Exception
       {
-         isClosed = false;
-         this.bridge = createBridge(this);
-         bridge.start();
+           paused = true;
+           clearBindings();
+           bridge.pause();
       }
+
+       public boolean isPaused()
+       {
+           return paused;
+       }
+
+       public void resume() throws Exception
+      {
+         paused = false;
+         bridge.resume();
+      }
       
       public boolean isClosed()
       {
@@ -799,7 +812,8 @@
          clearBindings();
       }
 
-      public void setBridge(final Bridge bridge)
+
+       public void setBridge(final Bridge bridge)
       {
          this.bridge = bridge;
       }



More information about the hornetq-commits mailing list