[hornetq-commits] JBoss hornetq SVN: r8010 - in branches/Replication_Clebert: src/main/org/hornetq/core/server and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Sep 29 17:16:46 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-09-29 17:16:46 -0400 (Tue, 29 Sep 2009)
New Revision: 8010
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
changes..
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-09-29 21:16:46 UTC (rev 8010)
@@ -50,7 +50,7 @@
*/
public void handlePacket(Packet packet)
{
-
+ System.out.println("packet = " + packet);
}
/* (non-Javadoc)
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-09-29 21:16:46 UTC (rev 8010)
@@ -34,7 +34,7 @@
// Attributes ----------------------------------------------------
- // TODO: Should this be configurable or not?
+ // TODO: where should this be configured?
private static final int WINDOW_SIZE = 100 * 1024;
private final ConnectionManager connectionManager;
@@ -65,8 +65,7 @@
*/
public void replicate(byte[] bytes, ReplicationToken token)
{
- // TODO Auto-generated method stub
-
+ replicatingChannel.send(new CreateReplicationSessionMessage(1, 1));
}
/* (non-Javadoc)
@@ -105,12 +104,18 @@
*/
public void stop() throws Exception
{
- replicatingChannel.close();
+ if (replicatingChannel != null)
+ {
+ replicatingChannel.close();
+ }
this.started = false;
+
+ if (connection != null)
+ {
+ connection.destroy();
+ }
- connection.destroy();
-
connection = null;
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java 2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/HornetQServer.java 2009-09-29 21:16:46 UTC (rev 8010)
@@ -19,6 +19,7 @@
import javax.management.MBeanServer;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.persistence.StorageManager;
@@ -70,7 +71,7 @@
ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
- ReplicationEndpoint createReplicationEndpoint();
+ ReplicationEndpoint createReplicationEndpoint() throws HornetQException;
CreateSessionResponseMessage createSession(String name,
long channelID,
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-29 21:16:46 UTC (rev 8010)
@@ -593,12 +593,16 @@
return new CreateSessionResponseMessage(true, version.getIncrementingVersion());
}
- public synchronized ReplicationEndpoint createReplicationEndpoint()
+ public synchronized ReplicationEndpoint createReplicationEndpoint() throws HornetQException
{
+ if (!configuration.isBackup())
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected server is not a backup server");
+ }
+
if (replicationEndpoint == null)
{
replicationEndpoint = new ReplicationEndpointImpl(this);
-
}
return replicationEndpoint;
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-29 19:23:15 UTC (rev 8009)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-09-29 21:16:46 UTC (rev 8010)
@@ -33,6 +33,7 @@
import org.hornetq.core.client.impl.ConnectionManagerImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.persistence.StorageManager;
@@ -40,11 +41,9 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Interceptor;
-import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
@@ -59,6 +58,7 @@
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
@@ -82,8 +82,6 @@
// Attributes ----------------------------------------------------
- private RemotingService remoting;
-
private ThreadFactory tFactory;
private ExecutorService executor;
@@ -100,11 +98,82 @@
public void testBasicConnection() throws Exception
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
- manager.start();
- manager.stop();
+
+ Configuration config = createDefaultConfig(false);
+
+ config.setBackup(true);
+
+ HornetQServer server = new HornetQServerImpl(config);
+
+ server.start();
+
+ try
+ {
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ manager.start();
+ manager.stop();
+ }
+ finally
+ {
+ server.stop();
+ }
}
+ public void testConnectIntoNonBackup() throws Exception
+ {
+
+ Configuration config = createDefaultConfig(false);
+
+ config.setBackup(false);
+
+ HornetQServer server = new HornetQServerImpl(config);
+
+ server.start();
+
+ try
+ {
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ try
+ {
+ manager.start();
+ fail("Exception was expected");
+ }
+ catch (HornetQException expected)
+ {
+ }
+
+ manager.stop();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ public void testSendPackets() throws Exception
+ {
+
+ Configuration config = createDefaultConfig(false);
+
+ config.setBackup(true);
+
+ HornetQServer server = new HornetQServerImpl(config);
+
+ server.start();
+
+ try
+ {
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager);
+ manager.start();
+ manager.replicate(new byte[]{3}, null);
+ manager.stop();
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
// Package protected ---------------------------------------------
class LocalRemotingServiceImpl extends RemotingServiceImpl
{
@@ -137,7 +206,7 @@
protected void setUp() throws Exception
{
- Configuration config = createDefaultConfig(false);
+ super.setUp();
tFactory = new HornetQThreadFactory("HornetQ-ReplicationTest", false);
@@ -145,10 +214,6 @@
scheduledExecutor = new ScheduledThreadPoolExecutor(10, tFactory);
- remoting = new LocalRemotingServiceImpl(config, new FakeServer(), null, null, executor, scheduledExecutor, 0);
-
- remoting.start();
-
TransportConfiguration connectorConfig = new TransportConfiguration(InVMConnectorFactory.class.getName(),
new HashMap<String, Object>(),
randomString());
@@ -175,16 +240,14 @@
protected void tearDown() throws Exception
{
- remoting.stop();
-
executor.shutdown();
scheduledExecutor.shutdown();
- remoting = null;
-
tFactory = null;
+ connectionManager = null;
+
scheduledExecutor = null;
}
More information about the hornetq-commits
mailing list