[hornetq-commits] JBoss hornetq SVN: r8010 - in branches/Replication_Clebert: src/main/org/hornetq/core/server and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Sep 29 17:16:46 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-09-29 17:16:46 -0400 (Tue, 29 Sep 2009)
New Revision: 8010

Modified:
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
   branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
changes..

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-09-29 21:16:46 UTC (rev 8010)
@@ -50,7 +50,7 @@
     */
    public void handlePacket(Packet packet)
    {
-
+      System.out.println("packet = " + packet);
    }
 
    /* (non-Javadoc)

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-09-29 21:16:46 UTC (rev 8010)
@@ -34,7 +34,7 @@
 
    // Attributes ----------------------------------------------------
 
-   // TODO: Should this be configurable or not?
+   // TODO: where should this be configured?
    private static final int WINDOW_SIZE = 100 * 1024;
 
    private final ConnectionManager connectionManager;
@@ -65,8 +65,7 @@
     */
    public void replicate(byte[] bytes, ReplicationToken token)
    {
-      // TODO Auto-generated method stub
-
+      replicatingChannel.send(new CreateReplicationSessionMessage(1, 1));
    }
 
    /* (non-Javadoc)
@@ -105,12 +104,18 @@
     */
    public void stop() throws Exception
    {
-      replicatingChannel.close();
+      if (replicatingChannel != null)
+      {
+         replicatingChannel.close();
+      }
 
       this.started = false;
+      
+      if (connection != null)
+      {
+         connection.destroy();
+      }
 
-      connection.destroy();
-
       connection = null;
    }
 

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java	2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java	2009-09-29 21:16:46 UTC (rev 8010)
@@ -19,6 +19,7 @@
 import javax.management.MBeanServer;
 
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.management.impl.HornetQServerControlImpl;
 import org.hornetq.core.persistence.StorageManager;
@@ -70,7 +71,7 @@
 
    ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
    
-   ReplicationEndpoint createReplicationEndpoint();
+   ReplicationEndpoint createReplicationEndpoint() throws HornetQException;
 
    CreateSessionResponseMessage createSession(String name,
                                               long channelID,                                              

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-09-29 21:16:46 UTC (rev 8010)
@@ -593,12 +593,16 @@
       return new CreateSessionResponseMessage(true, version.getIncrementingVersion());
    }
    
-   public synchronized ReplicationEndpoint createReplicationEndpoint()
+   public synchronized ReplicationEndpoint createReplicationEndpoint() throws HornetQException
    {
+      if (!configuration.isBackup())
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected server is not a backup server");
+      }
+      
       if (replicationEndpoint == null)
       {
          replicationEndpoint = new ReplicationEndpointImpl(this);
-         
       }
       return replicationEndpoint;
    }

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-09-29 21:16:46 UTC (rev 8010)
@@ -33,6 +33,7 @@
 import org.hornetq.core.client.impl.ConnectionManagerImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.management.impl.HornetQServerControlImpl;
 import org.hornetq.core.persistence.StorageManager;
@@ -40,11 +41,9 @@
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.Interceptor;
-import org.hornetq.core.remoting.Packet;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
@@ -59,6 +58,7 @@
 import org.hornetq.core.server.QueueFactory;
 import org.hornetq.core.server.ServerSession;
 import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.impl.HornetQServerImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.ResourceManager;
@@ -82,8 +82,6 @@
 
    // Attributes ----------------------------------------------------
 
-   private RemotingService remoting;
-
    private ThreadFactory tFactory;
 
    private ExecutorService executor;
@@ -100,11 +98,82 @@
 
    public void testBasicConnection() throws Exception
    {
-      ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
-      manager.start();
-      manager.stop();
+
+      Configuration config = createDefaultConfig(false);
+
+      config.setBackup(true);
+
+      HornetQServer server = new HornetQServerImpl(config);
+
+      server.start();
+
+      try
+      {
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+         manager.start();
+         manager.stop();
+      }
+      finally
+      {
+         server.stop();
+      }
    }
 
+   public void testConnectIntoNonBackup() throws Exception
+   {
+
+      Configuration config = createDefaultConfig(false);
+
+      config.setBackup(false);
+
+      HornetQServer server = new HornetQServerImpl(config);
+
+      server.start();
+
+      try
+      {
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+         try
+         {
+            manager.start();
+            fail("Exception was expected");
+         }
+         catch (HornetQException expected)
+         {
+         }
+
+         manager.stop();
+      }
+      finally
+      {
+         server.stop();
+      }
+   }
+
+   public void testSendPackets() throws Exception
+   {
+
+      Configuration config = createDefaultConfig(false);
+
+      config.setBackup(true);
+
+      HornetQServer server = new HornetQServerImpl(config);
+
+      server.start();
+
+      try
+      {
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+         manager.start();
+         manager.replicate(new byte[]{3}, null);
+         manager.stop();
+      }
+      finally
+      {
+         server.stop();
+      }
+   }
+
    // Package protected ---------------------------------------------
    class LocalRemotingServiceImpl extends RemotingServiceImpl
    {
@@ -137,7 +206,7 @@
 
    protected void setUp() throws Exception
    {
-      Configuration config = createDefaultConfig(false);
+      super.setUp();
 
       tFactory = new HornetQThreadFactory("HornetQ-ReplicationTest", false);
 
@@ -145,10 +214,6 @@
 
       scheduledExecutor = new ScheduledThreadPoolExecutor(10, tFactory);
 
-      remoting = new LocalRemotingServiceImpl(config, new FakeServer(), null, null, executor, scheduledExecutor, 0);
-
-      remoting.start();
-
       TransportConfiguration connectorConfig = new TransportConfiguration(InVMConnectorFactory.class.getName(),
                                                                           new HashMap<String, Object>(),
                                                                           randomString());
@@ -175,16 +240,14 @@
 
    protected void tearDown() throws Exception
    {
-      remoting.stop();
-
       executor.shutdown();
 
       scheduledExecutor.shutdown();
 
-      remoting = null;
-
       tFactory = null;
 
+      connectionManager = null;
+
       scheduledExecutor = null;
    }
 



More information about the hornetq-commits mailing list