JBoss hornetq SVN: r10893 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-28 11:21:14 -0400 (Tue, 28 Jun 2011)
New Revision: 10893
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Trigger replication from 'live'
(which fails as current code requires it to be started from the backup).
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-28 15:19:24 UTC (rev 10892)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-28 15:21:14 UTC (rev 10893)
@@ -18,8 +18,19 @@
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.HornetQException;
@@ -57,13 +68,13 @@
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
- private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
+ private final Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
private TransportConfiguration[] initialConnectors;
- private DiscoveryGroupConfiguration discoveryGroupConfiguration;
+ private final DiscoveryGroupConfiguration discoveryGroupConfiguration;
- private StaticConnector staticConnector = new StaticConnector();
+ private final StaticConnector staticConnector = new StaticConnector();
private Topology topology = new Topology();
@@ -158,14 +169,14 @@
private boolean backup;
private final Exception e = new Exception();
-
+
// To be called when there are ServerLocator being finalized.
// To be used on test assertions
public static Runnable finalizeCallback = null;
-
+
public static synchronized void clearThreadPools()
{
-
+
if (globalThreadPool != null)
{
globalThreadPool.shutdown();
@@ -184,7 +195,7 @@
globalThreadPool = null;
}
}
-
+
if (globalScheduledThreadPool != null)
{
globalScheduledThreadPool.shutdown();
@@ -1318,8 +1329,9 @@
}
}
- class StaticConnector implements Serializable
+ final class StaticConnector implements Serializable
{
+ private static final long serialVersionUID = 959566606042156036L;
private List<Connector> connectors;
public ClientSessionFactory connect() throws HornetQException
@@ -1344,14 +1356,14 @@
try
{
-
+
List<Future<ClientSessionFactory>> futuresList = new ArrayList<Future<ClientSessionFactory>>();
-
+
for (Connector conn : connectors)
{
futuresList.add(threadPool.submit(conn));
}
-
+
for (int i = 0, futuresSize = futuresList.size(); i < futuresSize; i++)
{
Future<ClientSessionFactory> future = futuresList.get(i);
@@ -1359,7 +1371,9 @@
{
csf = future.get();
if (csf != null)
+ {
break;
+ }
}
catch (Exception e)
{
@@ -1415,6 +1429,7 @@
}
}
+ @Override
public void finalize() throws Throwable
{
if (!closed && finalizeCheck)
@@ -1423,7 +1438,7 @@
System.identityHashCode(this));
log.warn("The ServerLocator you didn't close was created here:", e);
-
+
if (ServerLocatorImpl.finalizeCallback != null)
{
ServerLocatorImpl.finalizeCallback.run();
@@ -1437,7 +1452,7 @@
class Connector implements Callable<ClientSessionFactory>
{
- private TransportConfiguration initialConnector;
+ private final TransportConfiguration initialConnector;
private volatile ClientSessionFactoryInternal factory;
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-06-28 15:19:24 UTC (rev 10892)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-06-28 15:21:14 UTC (rev 10893)
@@ -54,6 +54,9 @@
private final List<Interceptor> interceptors;
+ private final Map<String, ServerSessionPacketHandler> sessionHandlers =
+ new ConcurrentHashMap<String, ServerSessionPacketHandler>();
+
public CoreProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
{
this.server = server;
@@ -153,8 +156,18 @@
else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
{
HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
- System.out.println("HA_BACKUP_REGISTRATION: " + msg);
+ System.out.println("HA_BACKUP_REGISTRATION: " + msg + " connector=" + msg.getConnector());
System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
+ try
+ {
+ server.addHaBackup(msg.getConnector());
+ }
+ catch (Exception e)
+ {
+ // XXX This is not what we want
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
}
}
});
@@ -164,8 +177,6 @@
return entry;
}
- private final Map<String, ServerSessionPacketHandler> sessionHandlers = new ConcurrentHashMap<String, ServerSessionPacketHandler>();
-
public ServerSessionPacketHandler getSessionHandler(final String sessionName)
{
return sessionHandlers.get(sessionName);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-06-28 15:19:24 UTC (rev 10892)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-06-28 15:21:14 UTC (rev 10893)
@@ -20,6 +20,7 @@
import javax.management.MBeanServer;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
@@ -49,23 +50,23 @@
* This interface defines the internal interface of the HornetQ Server exposed to other components of the server. The
* external management interface of the HornetQ Server is defined by the HornetQServerManagement interface This
* interface is never exposed outside the HornetQ server, e.g. by JMX or other means
- *
+ *
* @author <a href="tim.fox(a)jboss.com">Tim Fox</a>
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
*/
public interface HornetQServer extends HornetQComponent
{
-
+
void setIdentity(String identity);
-
+
String getIdentity();
-
+
Configuration getConfiguration();
RemotingService getRemotingService();
StorageManager getStorageManager();
-
+
PagingManager getPagingManager();
ManagementService getManagementService();
@@ -75,12 +76,12 @@
MBeanServer getMBeanServer();
Version getVersion();
-
+
NodeManager getNodeManager();
/**
* Returns the resource to manage this HornetQ server.
- *
+ *
* Using this control will throw IllegalStateException if the
* server is not properly started.
*/
@@ -90,7 +91,7 @@
void unregisterActivateCallback(ActivateCallback callback);
- /** The journal at the backup server has to be equivalent as the journal used on the live node.
+ /** The journal at the backup server has to be equivalent as the journal used on the live node.
* Or else the backup node is out of sync. */
ReplicationEndpoint connectToReplicationEndpoint(Channel channel) throws Exception;
@@ -143,31 +144,31 @@
SimpleString filterString,
boolean durable,
boolean temporary) throws Exception;
-
+
Queue locateQueue(SimpleString queueName) throws Exception;
void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
ScheduledExecutorService getScheduledPool();
-
+
ExecutorFactory getExecutorFactory();
void setGroupingHandler(GroupingHandler groupingHandler);
GroupingHandler getGroupingHandler();
-
+
ReplicationEndpoint getReplicationEndpoint();
-
+
ReplicationManager getReplicationManager();
- boolean checkActivate() throws Exception;
-
+ boolean checkActivate() throws Exception;
+
void deployDivert(DivertConfiguration config) throws Exception;
void destroyDivert(SimpleString name) throws Exception;
ConnectorsService getConnectorsService();
-
+
void deployBridge(BridgeConfiguration config) throws Exception;
void destroyBridge(String name) throws Exception;
@@ -175,4 +176,10 @@
ServerSession getSessionByID(String sessionID);
void stop(boolean failoverOnServerShutdown) throws Exception;
+
+ /**
+ * @param connector
+ * @throws Exception
+ */
+ void addHaBackup(TransportConfiguration connector) throws Exception;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-28 15:19:24 UTC (rev 10892)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-28 15:21:14 UTC (rev 10893)
@@ -41,8 +41,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
@@ -85,6 +87,7 @@
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.Role;
import org.hornetq.core.security.SecurityStore;
@@ -119,6 +122,7 @@
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
@@ -209,6 +213,8 @@
private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
+ private final Set<String> sharedNothingBackups = new ConcurrentHashSet<String>();
+
private final Object initialiseLock = new Object();
private boolean initialised;
@@ -576,6 +582,8 @@
private Activation activation;
+ private ServerLocator serverLocator;
+
public synchronized void start() throws Exception
{
initialiseLogging();
@@ -1247,31 +1255,23 @@
// Private
// --------------------------------------------------------------------------------------
- // private boolean startReplication() throws Exception
- // {
- // // get list of backup names!
- //
- // if (!configuration.isSharedStore() && backupConnectorName != null)
- // {
- // TransportConfiguration backupConnector =
- // configuration.getConnectorConfigurations().get(backupConnectorName);
- //
- // if (backupConnector == null)
- // {
- // HornetQServerImpl.log.warn("connector with name '" + backupConnectorName +
- // "' is not defined in the configuration.");
- // return false;
- // }
- // replicationFailoverManager = createBackupConnectionFailoverManager(backupConnector,
- // threadPool, scheduledPool);
- //
- // replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
- // replicationManager.start();
- // }
- //
- // return true;
- // }
+ private boolean startReplication(TransportConfiguration connector) throws Exception
+ {
+ assert !configuration.isSharedStore();
+ if (configuration.isSharedStore() || connector == null)
+ {
+ return true;
+ }
+ serverLocator = HornetQClient.createServerLocatorWithHA(connector);
+ ClientSessionFactoryInternal replicationFailoverManager =
+ (ClientSessionFactoryInternal)serverLocator.createSessionFactory(connector);
+ replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
+ replicationManager.start();
+
+ return true;
+ }
+
private void callActivateCallbacks()
{
for (ActivateCallback callback : activateCallbacks)
@@ -1942,6 +1942,12 @@
return "HornetQServerImpl::" + (identity == null ? "" : (identity + ", ")) + (nodeManager != null ? ("serverUUID=" + nodeManager.getUUID()) : "");
}
- // Inner classes
- // --------------------------------------------------------------------------------
+ @Override
+ public void addHaBackup(TransportConfiguration connector) throws Exception
+ {
+ log.info(connector + " " + connector.getFactoryClassName() + " " + connector.getParams() + " " +
+ replicationManager);
+ startReplication(connector);
+ // throw new UnsupportedOperationException("unimplemented");
+ }
}
13 years, 6 months
JBoss hornetq SVN: r10892 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-28 11:19:24 -0400 (Tue, 28 Jun 2011)
New Revision: 10892
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 improve tests: Set identity as the nodeId is the same (for backup and live).
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-28 15:18:21 UTC (rev 10891)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-28 15:19:24 UTC (rev 10892)
@@ -170,7 +170,7 @@
config1.getAcceptorConfigurations().clear();
config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
TransportConfiguration tc = getConnectorTransportConfiguration(true);
- config1.getConnectorConfigurations().put(LIVE_NODE_NAME, tc);
+ config1.getConnectorConfigurations().put(LIVE_NODE_NAME, tc);
//liveConfig.setBackupConnectorName("toBackup");
config1.setSecurityEnabled(false);
@@ -180,6 +180,7 @@
backupConfig = config1;
backupServer = createBackupServer();
+ backupServer.getServer().setIdentity("id_backup");
Configuration config0 = super.createDefaultConfig();
config0.getAcceptorConfigurations().clear();
@@ -189,6 +190,7 @@
config0.setSharedStore(false);
liveConfig = config0;
liveServer = createLiveServer();
+ liveServer.getServer().setIdentity("id_live");
liveServer.start();
backupServer.start();
13 years, 6 months
JBoss hornetq SVN: r10891 - branches/HORNETQ-720_Replication/etc.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-28 11:18:21 -0400 (Tue, 28 Jun 2011)
New Revision: 10891
Modified:
branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs
branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.ui.prefs
Log:
More Eclipse settings adjustments. (mostly reducing the number of 'save actions')
Modified: branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs
===================================================================
--- branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs 2011-06-28 15:18:04 UTC (rev 10890)
+++ branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs 2011-06-28 15:18:21 UTC (rev 10891)
@@ -1,4 +1,4 @@
-#Thu Jun 23 17:28:54 CEST 2011
+#Mon Jun 27 17:03:43 CEST 2011
eclipse.preferences.version=1
org.eclipse.jdt.core.codeComplete.argumentPrefixes=
org.eclipse.jdt.core.codeComplete.argumentSuffixes=
@@ -26,8 +26,8 @@
org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation=16
org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant=82
org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call=82
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=82
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=80
+org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=18
+org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=16
org.eclipse.jdt.core.formatter.alignment_for_assignment=16
org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
org.eclipse.jdt.core.formatter.alignment_for_compact_if=82
@@ -106,7 +106,7 @@
org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_member=insert
org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method=insert
org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter=do not insert
+org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter=insert
org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type=insert
org.eclipse.jdt.core.formatter.insert_new_line_after_label=do not insert
org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer=do not insert
Modified: branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.ui.prefs
===================================================================
--- branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.ui.prefs 2011-06-28 15:18:04 UTC (rev 10890)
+++ branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.ui.prefs 2011-06-28 15:18:21 UTC (rev 10891)
@@ -1,4 +1,4 @@
-#Thu Jun 23 17:29:46 CEST 2011
+#Tue Jun 28 17:14:52 CEST 2011
cleanup.add_default_serial_version_id=false
cleanup.add_generated_serial_version_id=true
cleanup.add_missing_annotations=true
@@ -53,8 +53,8 @@
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
-formatter_profile=_HornetQ
-formatter_settings_version=11
+formatter_profile=_'HornetQ'
+formatter_settings_version=12
org.eclipse.jdt.ui.exception.name=e
org.eclipse.jdt.ui.gettersetter.use.is=true
org.eclipse.jdt.ui.ignorelowercasenames=true
@@ -73,7 +73,7 @@
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=false
sp_cleanup.add_serial_version_id=false
-sp_cleanup.always_use_blocks=true
+sp_cleanup.always_use_blocks=false
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=false
sp_cleanup.always_use_this_for_non_static_method_access=false
@@ -109,10 +109,10 @@
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
-sp_cleanup.use_blocks=true
-sp_cleanup.use_blocks_only_for_return_and_throw=false
+sp_cleanup.use_blocks=false
+sp_cleanup.use_blocks_only_for_return_and_throw=true
sp_cleanup.use_parentheses_in_expressions=false
-sp_cleanup.use_this_for_non_static_field_access=true
+sp_cleanup.use_this_for_non_static_field_access=false
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=true
-sp_cleanup.use_this_for_non_static_method_access=true
+sp_cleanup.use_this_for_non_static_method_access=false
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=true
13 years, 6 months
JBoss hornetq SVN: r10890 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-28 11:18:04 -0400 (Tue, 28 Jun 2011)
New Revision: 10890
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
Use class objects instead of hard-coded names in strings.
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-27 21:00:44 UTC (rev 10889)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-28 15:18:04 UTC (rev 10890)
@@ -37,9 +37,13 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
@@ -165,7 +169,9 @@
config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_backup");
config1.getAcceptorConfigurations().clear();
config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
- config1.getConnectorConfigurations().put(LIVE_NODE_NAME, getConnectorTransportConfiguration(true));
+ TransportConfiguration tc = getConnectorTransportConfiguration(true);
+ config1.getConnectorConfigurations().put(LIVE_NODE_NAME, tc);
+
//liveConfig.setBackupConnectorName("toBackup");
config1.setSecurityEnabled(false);
config1.setSharedStore(false);
@@ -325,7 +331,7 @@
{
if (live)
{
- return new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
+ return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName());
}
else
{
@@ -333,7 +339,7 @@
server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
- return new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory", server1Params);
+ return new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), server1Params);
}
}
@@ -341,7 +347,7 @@
{
if (live)
{
- return new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory");
+ return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName());
}
else
{
@@ -349,7 +355,7 @@
server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
- return new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", server1Params);
+ return new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), server1Params);
}
}
@@ -357,7 +363,7 @@
{
if (live)
{
- return new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory");
+ return new TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName());
}
else
{
@@ -366,7 +372,7 @@
server1Params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
- return new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory",
+ return new TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName(),
server1Params);
}
}
@@ -375,7 +381,7 @@
{
if (live)
{
- return new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory");
+ return new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName());
}
else
{
@@ -384,8 +390,7 @@
server1Params.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,
org.hornetq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
- return new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory",
- server1Params);
+ return new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), server1Params);
}
}
13 years, 6 months
JBoss hornetq SVN: r10889 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-27 17:00:44 -0400 (Mon, 27 Jun 2011)
New Revision: 10889
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Reverting change to what we were supposed to have
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-27 15:31:15 UTC (rev 10888)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-27 21:00:44 UTC (rev 10889)
@@ -588,7 +588,6 @@
{
boolean retry;
int attempts = 0;
- int cfAttempts = 0;
do
{
retry = false;
@@ -611,7 +610,7 @@
threadPool,
scheduledThreadPool,
interceptors);
- factory.connect(1, failoverOnInitialConnection);
+ factory.connect(initialConnectAttempts, failoverOnInitialConnection);
}
catch (HornetQException e)
{
@@ -619,21 +618,15 @@
factory = null;
if (e.getCode() == HornetQException.NOT_CONNECTED)
{
- cfAttempts ++;
- if (topologyArray != null && cfAttempts == topologyArray.length)
+ attempts++;
+
+ if (topologyArray != null && attempts == topologyArray.length)
{
- attempts++;
- cfAttempts = 0;
- wait(retryInterval);
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried with all available servers.");
}
- if (topologyArray == null && initialConnectors != null && cfAttempts == initialConnectors.length)
+ if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
{
- attempts++;
- cfAttempts = 0;
- wait(retryInterval);
- }
- if (initialConnectAttempts != -1 && attempts >= initialConnectAttempts)
- {
throw new HornetQException(HornetQException.NOT_CONNECTED,
"Cannot connect to server(s). Tried with all available servers.");
}
13 years, 6 months
JBoss hornetq SVN: r10888 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-06-27 11:31:15 -0400 (Mon, 27 Jun 2011)
New Revision: 10888
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
added reconnect logic to serverlocator
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-27 14:20:38 UTC (rev 10887)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-27 15:31:15 UTC (rev 10888)
@@ -588,6 +588,7 @@
{
boolean retry;
int attempts = 0;
+ int cfAttempts = 0;
do
{
retry = false;
@@ -610,7 +611,7 @@
threadPool,
scheduledThreadPool,
interceptors);
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ factory.connect(1, failoverOnInitialConnection);
}
catch (HornetQException e)
{
@@ -618,15 +619,21 @@
factory = null;
if (e.getCode() == HornetQException.NOT_CONNECTED)
{
- attempts++;
-
- if (topologyArray != null && attempts == topologyArray.length)
+ cfAttempts ++;
+ if (topologyArray != null && cfAttempts == topologyArray.length)
{
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
+ attempts++;
+ cfAttempts = 0;
+ wait(retryInterval);
}
- if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
+ if (topologyArray == null && initialConnectors != null && cfAttempts == initialConnectors.length)
{
+ attempts++;
+ cfAttempts = 0;
+ wait(retryInterval);
+ }
+ if (initialConnectAttempts != -1 && attempts >= initialConnectAttempts)
+ {
throw new HornetQException(HornetQException.NOT_CONNECTED,
"Cannot connect to server(s). Tried with all available servers.");
}
13 years, 6 months
JBoss hornetq SVN: r10887 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-27 10:20:38 -0400 (Mon, 27 Jun 2011)
New Revision: 10887
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Add a HaBackupRegistrationMessage (still not functional)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-06-27 14:18:52 UTC (rev 10886)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-06-27 14:20:38 UTC (rev 10887)
@@ -30,6 +30,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
@@ -108,24 +109,24 @@
else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY)
{
SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage)packet;
-
+
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
}
-
+
public void nodeDown(String nodeID)
{
channel0.send(new ClusterTopologyChangeMessage(nodeID));
}
};
-
+
final boolean isCC = msg.isClusterConnection();
-
+
server.getClusterManager().addClusterTopologyListener(listener, isCC);
-
+
rc.addCloseListener(new CloseListener()
{
public void connectionClosed()
@@ -149,15 +150,21 @@
}
server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, true);
}
+ else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+ {
+ HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
+ System.out.println("HA_BACKUP_REGISTRATION: " + msg);
+ System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
+ }
}
});
-
-
+
+
return entry;
}
- private Map<String, ServerSessionPacketHandler> sessionHandlers = new ConcurrentHashMap<String, ServerSessionPacketHandler>();
+ private final Map<String, ServerSessionPacketHandler> sessionHandlers = new ConcurrentHashMap<String, ServerSessionPacketHandler>();
public ServerSessionPacketHandler getSessionHandler(final String sessionName)
{
@@ -179,9 +186,15 @@
}
// This is never called using the core protocol, since we override the HornetQFrameDecoder with our core
- // optimised version HornetQFrameDecoder2, which nevers calls this
+ // optimised version HornetQFrameDecoder2, which never calls this
public int isReadyToHandle(HornetQBuffer buffer)
{
return -1;
}
+
+ @Override
+ public String toString()
+ {
+ return "CoreProtocolManager(server=" + server + ")";
+ }
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-06-27 14:18:52 UTC (rev 10886)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-06-27 14:20:38 UTC (rev 10887)
@@ -93,6 +93,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -163,7 +164,7 @@
public class PacketDecoder
{
private static final Logger log = Logger.getLogger(PacketDecoder.class);
-
+
public Packet decode(final HornetQBuffer in)
{
final byte packetType = in.readByte();
@@ -524,6 +525,11 @@
packet = new SessionAddMetaDataMessageV2();
break;
}
+ case PacketImpl.HA_BACKUP_REGISTRATION:
+ {
+ packet = new HaBackupRegistrationMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-06-27 14:18:52 UTC (rev 10886)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-06-27 14:20:38 UTC (rev 10887)
@@ -149,7 +149,7 @@
public static final byte SESS_PRODUCER_REQUEST_CREDITS = 79;
public static final byte SESS_PRODUCER_CREDITS = 80;
-
+
public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE = 81;
// Replication
@@ -183,17 +183,19 @@
public static final byte REPLICATION_SYNC = 103;
// HA
-
+
public static final byte SESS_ADD_METADATA = 104;
-
+
public static final byte SESS_ADD_METADATA2 = 105;
-
+
public static final byte CLUSTER_TOPOLOGY = 110;
public static final byte NODE_ANNOUNCE = 111;
public static final byte SUBSCRIBE_TOPOLOGY = 112;
+ public static final byte HA_BACKUP_REGISTRATION = 113;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
@@ -276,7 +278,7 @@
{
return true;
}
-
+
public boolean isAsyncExec()
{
return false;
Added: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java 2011-06-27 14:20:38 UTC (rev 10887)
@@ -0,0 +1,60 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Registers a backup node with its live server.
+ * <p>
+ * After registration the live server will initiate synchronization of its state with the new backup
+ * node.
+ */
+public class HaBackupRegistrationMessage extends PacketImpl
+{
+
+ private TransportConfiguration connector;
+
+ private String nodeID;
+
+ public HaBackupRegistrationMessage(String nodeId, TransportConfiguration tc)
+ {
+ this();
+ connector = tc;
+ nodeID = nodeId;
+ }
+
+ public HaBackupRegistrationMessage()
+ {
+ super(HA_BACKUP_REGISTRATION);
+ }
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public TransportConfiguration getConnector()
+ {
+ return connector;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(nodeID);
+ connector.encode(buffer);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ nodeID = buffer.readString();
+ connector = new TransportConfiguration();
+ connector.decode(buffer);
+ }
+
+}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-27 14:18:52 UTC (rev 10886)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-27 14:20:38 UTC (rev 10887)
@@ -80,7 +80,7 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.replication.ReplicationEndpoint;
@@ -521,14 +521,14 @@
initialisePart1();
clusterManager.start();
- // Try-Connect to live server using live-connector-ref
- String liveConnectorsName = configuration.getLiveConnectorName();
- if (liveConnectorsName == null)
+
+
+ String liveConnectorName = configuration.getLiveConnectorName();
+ if (liveConnectorName == null)
{
throw new IllegalArgumentException("Cannot have a replicated backup without configuring its live-server!");
}
- final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorsName);
- log.info("config is " + config);
+ final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
final ServerLocatorInternal serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
@@ -536,28 +536,30 @@
// sit in loop and try and connect, if server is not live then it will return NOT_LIVE
final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
- if (liveServerSessionFactory != null)
+ if (liveServerSessionFactory == null)
{
- log.debug("announce backup to live-server");
- liveServerSessionFactory.getConnection()
- .getChannel(0, -1)
- .send(new NodeAnnounceMessage(getNodeID().toString(), true, config));
- log.info("backup announced");
+ // XXX
+ throw new RuntimeException("Need to retry...");
}
+ log.info("announce backup to live-server (id=" + liveConnectorName + ")");
+ liveServerSessionFactory.getConnection()
+ .getChannel(0, -1)
+ .send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
+ log.info("backup registered");
+
started = true;
log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
"] started, waiting live to fail before it gets active");
-
nodeManager.awaitLiveNode();
+ // Server node (i.e. Life node) is not running, now the backup takes over.
- // XXX ???
+ // XXX this really belongs to this point?
+ initialisePart2();
+
configuration.setBackup(false);
- // XXX
-
- initialisePart2();
}
catch (Exception e)
{
@@ -1247,28 +1249,25 @@
// private boolean startReplication() throws Exception
// {
- // String backupConnectorName = configuration.getBackupConnectorName();
+ // // get list of backup names!
//
// if (!configuration.isSharedStore() && backupConnectorName != null)
// {
- // TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
+ // TransportConfiguration backupConnector =
+ // configuration.getConnectorConfigurations().get(backupConnectorName);
//
// if (backupConnector == null)
// {
// HornetQServerImpl.log.warn("connector with name '" + backupConnectorName +
// "' is not defined in the configuration.");
+ // return false;
// }
- // else
- // {
- //
// replicationFailoverManager = createBackupConnectionFailoverManager(backupConnector,
- // threadPool,
- // scheduledPool);
+ // threadPool, scheduledPool);
//
// replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
// replicationManager.start();
// }
- // }
//
// return true;
// }
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-27 14:18:52 UTC (rev 10886)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-27 14:20:38 UTC (rev 10887)
@@ -165,6 +165,8 @@
config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_backup");
config1.getAcceptorConfigurations().clear();
config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
+ config1.getConnectorConfigurations().put(LIVE_NODE_NAME, getConnectorTransportConfiguration(true));
+ //liveConfig.setBackupConnectorName("toBackup");
config1.setSecurityEnabled(false);
config1.setSharedStore(false);
config1.setBackup(true);
@@ -177,8 +179,6 @@
config0.getAcceptorConfigurations().clear();
config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
config0.setName(LIVE_NODE_NAME);
- config0.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
- //liveConfig.setBackupConnectorName("toBackup");
config0.setSecurityEnabled(false);
config0.setSharedStore(false);
liveConfig = config0;
13 years, 6 months
JBoss hornetq SVN: r10886 - in branches/HORNETQ-720_Replication: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-27 10:18:52 -0400 (Mon, 27 Jun 2011)
New Revision: 10886
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Add some replication support
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-27 14:14:47 UTC (rev 10885)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-27 14:18:52 UTC (rev 10886)
@@ -38,8 +38,12 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
@@ -76,6 +80,7 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.replication.ReplicationEndpoint;
@@ -133,7 +138,7 @@
// ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(HornetQServerImpl.class);
-
+
// JMS Topics (which are outside of the scope of the core API) will require a dumb subscription with a dummy-filter at this current version
// as a way to keep its existence valid and TCK tests
// That subscription needs an invalid filter, however paging needs to ignore any subscription with this filter.
@@ -147,7 +152,7 @@
// Attributes
// -----------------------------------------------------------------------------------
-
+
private final Version version;
private final HornetQSecurityManager securityManager;
@@ -165,7 +170,7 @@
private volatile QueueFactory queueFactory;
private volatile PagingManager pagingManager;
-
+
private volatile PostOffice postOffice;
private volatile ExecutorService threadPool;
@@ -187,7 +192,7 @@
private volatile RemotingService remotingService;
private volatile ManagementService managementService;
-
+
private volatile ConnectorsService connectorsService;
private MemoryManager memoryManager;
@@ -217,9 +222,9 @@
private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
private volatile GroupingHandler groupingHandler;
-
+
private NodeManager nodeManager;
-
+
// Used to identify the server on tests... useful on debugging testcases
private String identity;
@@ -352,7 +357,7 @@
nodeManager.startLiveNode();
initialisePart2();
-
+
log.info("Server is now live");
}
catch (Exception e)
@@ -392,11 +397,11 @@
log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started, waiting live to fail before it gets active");
nodeManager.awaitLiveNode();
-
+
configuration.setBackup(false);
-
+
initialisePart2();
-
+
clusterManager.activate();
log.info("Backup Server is now live");
@@ -511,11 +516,48 @@
{
try
{
- // TODO
+ nodeManager.startBackup();
+ initialisePart1();
+
+ clusterManager.start();
// Try-Connect to live server using live-connector-ref
+ String liveConnectorsName = configuration.getLiveConnectorName();
+ if (liveConnectorsName == null)
+ {
+ throw new IllegalArgumentException("Cannot have a replicated backup without configuring its live-server!");
+ }
+ final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorsName);
+ log.info("config is " + config);
+ final ServerLocatorInternal serverLocator =
+ (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
+ // XXX Need to retry the connection a couple of times
// sit in loop and try and connect, if server is not live then it will return NOT_LIVE
+ final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
+
+ if (liveServerSessionFactory != null)
+ {
+ log.debug("announce backup to live-server");
+ liveServerSessionFactory.getConnection()
+ .getChannel(0, -1)
+ .send(new NodeAnnounceMessage(getNodeID().toString(), true, config));
+ log.info("backup announced");
+ }
+
+ started = true;
+
+ log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
+ "] started, waiting live to fail before it gets active");
+
+ nodeManager.awaitLiveNode();
+
+ // XXX ???
+ configuration.setBackup(false);
+
+ // XXX
+
+ initialisePart2();
}
catch (Exception e)
{
@@ -523,7 +565,7 @@
}
}
- public void close(boolean permanently) throws Exception
+ public void close(final boolean permanently) throws Exception
{
}
}
@@ -556,7 +598,7 @@
test.run();
}
-
+
if (!configuration.isBackup())
{
if (configuration.isSharedStore() && configuration.isPersistenceEnabled())
@@ -577,10 +619,9 @@
HornetQServerImpl.log.info("HornetQ Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started");
}
-
-
- if (configuration.isBackup())
+ else
{
+ // server is Backup
if (configuration.isSharedStore())
{
activation = new SharedStoreBackupActivation();
@@ -588,7 +629,6 @@
else
{
// Replicated
-
activation = new SharedNothingBackupActivation();
}
@@ -635,14 +675,14 @@
managementService.removeNotificationListener(groupingHandler);
groupingHandler = null;
}
-
+
if (clusterManager != null)
{
clusterManager.stop();
}
}
-
+
// We close all the exception in an attempt to let any pending IO to finish
// to avoid scenarios where the send or ACK got to disk but the response didn't get to the client
// It may still be possible to have this scenario on a real failure (without the use of XA)
@@ -729,9 +769,9 @@
{
memoryManager.stop();
}
-
+
threadPool.shutdown();
-
+
scheduledPool.shutdown();
try
@@ -747,7 +787,7 @@
}
threadPool = null;
-
+
try
{
if (!scheduledPool.awaitTermination(10, TimeUnit.SECONDS))
@@ -761,7 +801,7 @@
}
threadPool = null;
-
+
scheduledPool = null;
pagingManager = null;
@@ -805,22 +845,22 @@
// HornetQServer implementation
// -----------------------------------------------------------
-
+
public void setIdentity(String identity)
{
this.identity = identity;
}
-
+
public String getIdentity()
{
return identity;
}
-
+
public ScheduledExecutorService getScheduledPool()
{
return scheduledPool;
}
-
+
public Configuration getConfiguration()
{
return configuration;
@@ -830,7 +870,7 @@
{
return mbeanServer;
}
-
+
public PagingManager getPagingManager()
{
return pagingManager;
@@ -860,7 +900,7 @@
{
return securityRepository;
}
-
+
public NodeManager getNodeManager()
{
return nodeManager;
@@ -1025,18 +1065,18 @@
{
return createQueue(address, queueName, filterString, durable, temporary, false);
}
-
+
public Queue locateQueue(SimpleString queueName) throws Exception
{
Binding binding = postOffice.getBinding(queueName);
-
+
Bindable queue = binding.getBindable();
-
+
if (!(queue instanceof Queue))
{
throw new IllegalStateException("locateQueue should only be used to locate queues");
}
-
+
return (Queue) binding.getBindable();
}
@@ -1063,7 +1103,7 @@
}
Queue queue = (Queue)binding.getBindable();
-
+
if (queue.getPageSubscription() != null)
{
queue.getPageSubscription().close();
@@ -1177,7 +1217,7 @@
protected PagingManager createPagingManager()
{
-
+
return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
configuration.getJournalBufferSize_NIO(),
scheduledPool,
@@ -1187,8 +1227,8 @@
addressSettingsRepository);
}
- /**
- * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
+ /**
+ * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
*/
protected StorageManager createStorageManager()
{
@@ -1392,7 +1432,7 @@
addressSettingsDeployer.start();
}
-
+
deployAddressSettingsFromConfiguration();
storageManager.start();
@@ -1450,7 +1490,7 @@
// Load the journal and populate queues, transactions and caches in memory
pagingManager.reloadStores();
-
+
JournalLoadInformation[] journalInfo = loadJournals();
compareJournals(journalInfo);
@@ -1488,7 +1528,7 @@
// this needs to be done before clustering is fully activated
callActivateCallbacks();
- // Deply any pre-defined diverts
+ // Deploy any pre-defined diverts
deployDiverts();
if (deploymentManager != null)
@@ -1564,11 +1604,11 @@
for (QueueBindingInfo queueBindingInfo : queueBindingInfos)
{
queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
-
+
Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
PageSubscription subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(), filter, true);
-
+
Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
queueBindingInfo.getAddress(),
queueBindingInfo.getQueueName(),
@@ -1585,8 +1625,8 @@
managementService.registerAddress(queueBindingInfo.getAddress());
managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
-
-
+
+
}
for (GroupingInfo groupingInfo : groupingInfos)
@@ -1672,12 +1712,12 @@
}
Filter filter = FilterImpl.createFilter(filterString);
-
+
long queueID = storageManager.generateUniqueID();
PageSubscription pageSubscription;
-
-
+
+
if (filterString != null && filterString.toString().equals(GENERIC_IGNORED_FILTER))
{
pageSubscription = null;
@@ -1771,7 +1811,7 @@
managementService.registerDivert(divert, config);
}
-
+
public void destroyDivert(SimpleString name) throws Exception
{
Binding binding = postOffice.getBinding(name);
@@ -1814,7 +1854,7 @@
managementService.addNotificationListener(groupingHandler);
}
}
-
+
public void deployBridge(BridgeConfiguration config) throws Exception
{
if (clusterManager != null)
@@ -1822,7 +1862,7 @@
clusterManager.deployBridge(config);
}
}
-
+
public void destroyBridge(String name) throws Exception
{
if (clusterManager != null)
@@ -1875,7 +1915,7 @@
{
return sessions.get(sessionName);
}
-
+
/**
* Check if journal directory exists or create it (if configured to do so)
*/
@@ -1896,7 +1936,7 @@
}
}
}
-
+
@Override
public String toString()
{
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-27 14:14:47 UTC (rev 10885)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-27 14:18:52 UTC (rev 10886)
@@ -56,6 +56,7 @@
// Constants -----------------------------------------------------
protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+ protected static final String LIVE_NODE_NAME = "hqLIVE";
// Attributes ----------------------------------------------------
@@ -167,13 +168,15 @@
config1.setSecurityEnabled(false);
config1.setSharedStore(false);
config1.setBackup(true);
+ config1.setLiveConnectorName(LIVE_NODE_NAME);
backupConfig = config1;
+
backupServer = createBackupServer();
Configuration config0 = super.createDefaultConfig();
config0.getAcceptorConfigurations().clear();
config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
-
+ config0.setName(LIVE_NODE_NAME);
config0.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
//liveConfig.setBackupConnectorName("toBackup");
config0.setSecurityEnabled(false);
@@ -181,8 +184,8 @@
liveConfig = config0;
liveServer = createLiveServer();
+ liveServer.start();
backupServer.start();
- liveServer.start();
}
@Override
13 years, 6 months
JBoss hornetq SVN: r10885 - in branches/HORNETQ-720_Replication: etc and 7 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-27 10:14:47 -0400 (Mon, 27 Jun 2011)
New Revision: 10885
Modified:
branches/HORNETQ-720_Replication/
branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs
branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.ui.prefs
branches/HORNETQ-720_Replication/hornetq-core/pom.xml
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/CoreRemotingConnection.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/Packet.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/InVMNodeManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Acceptor.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Connector.java
Log:
merge changes from trunk
Property changes on: branches/HORNETQ-720_Replication
___________________________________________________________________
Added: svn:mergeinfo
+ /trunk:10878-10884
Modified: branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs
===================================================================
--- branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs 2011-06-27 13:41:35 UTC (rev 10884)
+++ branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs 2011-06-27 14:14:47 UTC (rev 10885)
@@ -1,4 +1,4 @@
-#Fri Jun 17 11:54:22 CEST 2011
+#Thu Jun 23 17:28:54 CEST 2011
eclipse.preferences.version=1
org.eclipse.jdt.core.codeComplete.argumentPrefixes=
org.eclipse.jdt.core.codeComplete.argumentSuffixes=
@@ -19,6 +19,7 @@
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.source=1.6
org.eclipse.jdt.core.formatter.align_type_members_on_columns=false
org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=82
@@ -87,7 +88,7 @@
org.eclipse.jdt.core.formatter.disabling_tag=@formatter\:off
org.eclipse.jdt.core.formatter.enabling_tag=@formatter\:on
org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line=false
-org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column=true
+org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column=false
org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header=true
org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header=true
org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header=true
Modified: branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.ui.prefs
===================================================================
--- branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.ui.prefs 2011-06-27 13:41:35 UTC (rev 10884)
+++ branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.ui.prefs 2011-06-27 14:14:47 UTC (rev 10885)
@@ -1,4 +1,4 @@
-#Wed Jun 15 11:33:43 CEST 2011
+#Thu Jun 23 17:29:46 CEST 2011
cleanup.add_default_serial_version_id=false
cleanup.add_generated_serial_version_id=true
cleanup.add_missing_annotations=true
@@ -82,7 +82,7 @@
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=true
sp_cleanup.make_local_variable_final=false
-sp_cleanup.make_parameters_final=true
+sp_cleanup.make_parameters_final=false
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
@@ -100,7 +100,7 @@
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
-sp_cleanup.remove_unnecessary_nls_tags=true
+sp_cleanup.remove_unnecessary_nls_tags=false
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
Modified: branches/HORNETQ-720_Replication/hornetq-core/pom.xml
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/pom.xml 2011-06-27 13:41:35 UTC (rev 10884)
+++ branches/HORNETQ-720_Replication/hornetq-core/pom.xml 2011-06-27 14:14:47 UTC (rev 10885)
@@ -1,5 +1,5 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -88,6 +88,36 @@
</executions>
</plugin>
</plugins>
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence
+ on the Maven build itself. -->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javacc-maven-plugin</artifactId>
+ <versionRange>[2.6,)</versionRange>
+ <goals>
+ <goal>javacc</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
</build>
</project>
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-06-27 13:41:35 UTC (rev 10884)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-06-27 14:14:47 UTC (rev 10885)
@@ -13,14 +13,13 @@
package org.hornetq.core.client.impl;
-import org.hornetq.api.core.HornetQException;
+import java.util.concurrent.Executor;
+
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
-import java.util.concurrent.Executor;
-
/**
* A ServerLocatorInternal
*
@@ -31,7 +30,7 @@
public interface ServerLocatorInternal extends ServerLocator
{
void start(Executor executor) throws Exception;
-
+
void factoryClosed(final ClientSessionFactory factory);
void setNodeID(String nodeID);
@@ -53,7 +52,7 @@
void setClusterTransportConfiguration(TransportConfiguration tc);
boolean isBackup();
-
+
void setBackup(boolean backup);
Topology getTopology();
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/CoreRemotingConnection.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2011-06-27 13:41:35 UTC (rev 10884)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/CoreRemotingConnection.java 2011-06-27 14:14:47 UTC (rev 10885)
@@ -24,7 +24,7 @@
*
*/
public interface CoreRemotingConnection extends RemotingConnection
-{
+{
/**
* return the channel with the channel id specified.
* <p/>
@@ -74,7 +74,7 @@
long getIDGeneratorSequence();
/**
- * return the current tomeout for blocking calls
+ * Return the current timeout for blocking calls.
*
* @return the timeout in milliseconds
*/
@@ -86,7 +86,7 @@
* @return the lock
*/
Object getTransferLock();
-
+
/**
* Called periodically to flush any data in the batch buffer
*/
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/Packet.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/Packet.java 2011-06-27 13:41:35 UTC (rev 10884)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/Packet.java 2011-06-27 14:14:47 UTC (rev 10885)
@@ -17,21 +17,22 @@
import org.hornetq.spi.core.protocol.RemotingConnection;
/**
- * A Packet represents a pcaket of data transmitted over a connection.
+ * A Packet represents a packet of data transmitted over a connection.
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*/
public interface Packet
{
/**
- * This sets the channel id that should be used once the packet has been successfully decoded it is sent to the correct channel.
- *
+ * Sets the channel id that should be used once the packet has been successfully decoded it is
+ * sent to the correct channel.
+ *
* @param channelID the id of the channel to handle the packet
*/
void setChannelID(long channelID);
/**
- * returns the channel id of the channel that should handle this pcaket
+ * Returns the channel id of the channel that should handle this packet.
*
* @return the id of the channel
*/
@@ -81,6 +82,6 @@
* @return true if confirmation is required
*/
boolean isRequiresConfirmations();
-
+
boolean isAsyncExec();
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-06-27 13:41:35 UTC (rev 10884)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-06-27 14:14:47 UTC (rev 10885)
@@ -51,11 +51,9 @@
import org.hornetq.utils.ExecutorFactory;
/**
- * A RepplicationManagerImpl
- *
+ * A ReplicationManagerImpl
+ *
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
*/
public class ReplicationManagerImpl implements ReplicationManager
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-27 13:41:35 UTC (rev 10884)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-27 14:14:47 UTC (rev 10885)
@@ -17,12 +17,16 @@
import java.lang.reflect.Array;
import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
@@ -33,7 +37,10 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
-import org.hornetq.core.config.*;
+import org.hornetq.core.config.BridgeConfiguration;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
@@ -54,7 +61,7 @@
* A ClusterManagerImpl
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 18 Nov 2008 09:23:49
*
*
@@ -92,18 +99,18 @@
// regular client listeners to be notified of cluster topology changes.
// they correspond to regular clients using a HA ServerLocator
- private Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
-
+ private final Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+
// cluster connections listeners to be notified of cluster topology changes
// they correspond to cluster connections on *other nodes connected to this one*
- private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+ private final Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
- private Topology topology = new Topology();
+ private final Topology topology = new Topology();
private volatile ServerLocatorInternal backupServerLocator;
private final List<ServerLocatorInternal> clusterLocators = new ArrayList<ServerLocatorInternal>();
- private Executor executor;
+ private final Executor executor;
public ClusterManagerImpl(final ExecutorFactory executorFactory,
final HornetQServer server,
@@ -173,7 +180,7 @@
{
announceNode();
}
-
+
started = true;
}
@@ -200,7 +207,7 @@
clusterConnection.stop();
managementService.unregisterCluster(clusterConnection.getName().toString());
}
-
+
clusterConnectionListeners.clear();
clientListeners.clear();
clusterConnections.clear();
@@ -238,7 +245,7 @@
}
boolean removed = topology.removeMember(nodeID);
-
+
if (removed)
{
@@ -254,6 +261,7 @@
}
}
+ // XXX Why is the interface's parameter 'backup' now 'last'?
public void notifyNodeUp(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last,
@@ -349,7 +357,7 @@
{
return topology;
}
-
+
// backup node becomes live
public synchronized void activate()
{
@@ -496,7 +504,7 @@
{
listener.nodeUP(nodeID, member.getConnector(), false);
}
-
+
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
listener.nodeUP(nodeID, member.getConnector(), false);
@@ -720,7 +728,7 @@
managementService.unregisterBridge(name);
}
}
-
+
private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
{
if (config.getName() == null)
@@ -816,7 +824,7 @@
clusterConnections.put(config.getName(), clusterConnection);
clusterConnection.start();
-
+
if(backup)
{
announceBackup(config, connector);
@@ -866,7 +874,7 @@
}
catch (Exception e)
{
- log.warn("Unable to announce backup", e);
+ log.warn("Unable to announce backup", e);
}
}
});
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/InVMNodeManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/InVMNodeManager.java 2011-06-27 13:41:35 UTC (rev 10884)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/InVMNodeManager.java 2011-06-27 14:14:47 UTC (rev 10885)
@@ -13,25 +13,31 @@
package org.hornetq.core.server.impl;
+import static org.hornetq.core.server.impl.InVMNodeManager.State.FAILING_BACK;
+import static org.hornetq.core.server.impl.InVMNodeManager.State.LIVE;
+import static org.hornetq.core.server.impl.InVMNodeManager.State.NOT_STARTED;
+import static org.hornetq.core.server.impl.InVMNodeManager.State.PAUSED;
+
+import java.util.concurrent.Semaphore;
+
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.NodeManager;
import org.hornetq.utils.UUIDGenerator;
-import java.util.concurrent.Semaphore;
-
-import static org.hornetq.core.server.impl.InVMNodeManager.State.*;
-
/**
- * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a>
- * Date: Oct 13, 2010
- * Time: 3:55:47 PM
+ * This is a _mock_ NodeManager and is used only in tests.
+ * <p>
+ * It allows writing tests without the need to spawn a new JVM.
+ *
+ * @author <a href="mailto:andy.taylor@jboss.com">Andy Taylor</a> Date: Oct 13, 2010 Time: 3:55:47
+ * PM
*/
public class InVMNodeManager extends NodeManager
{
- private Semaphore liveLock;
+ private final Semaphore liveLock;
- private Semaphore backupLock;
+ private final Semaphore backupLock;
public enum State {LIVE, PAUSED, FAILING_BACK, NOT_STARTED}
@@ -119,7 +125,7 @@
@Override
public boolean isAwaitingFailback() throws Exception
{
- return state == FAILING_BACK;
+ return state == FAILING_BACK;
}
@Override
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Acceptor.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Acceptor.java 2011-06-27 13:41:35 UTC (rev 10884)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Acceptor.java 2011-06-27 14:14:47 UTC (rev 10885)
@@ -17,11 +17,12 @@
import org.hornetq.core.server.management.NotificationService;
/**
- * An Acceptor is used by the Remoting Service to allow clients to connect. It should take care of dispatching client requests
- * to the Remoting Service's Dispatcher.
- *
+ * An Acceptor is used by the Remoting Service to allow clients to connect. It should take care of
+ * dispatching client requests to the Remoting Service's Dispatcher.
+ *
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
* @author <a href="tim.fox(a)jboss.com">Tim Fox</a>
+ * @see Connector
*/
public interface Acceptor extends HornetQComponent
{
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Connector.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Connector.java 2011-06-27 13:41:35 UTC (rev 10884)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/remoting/Connector.java 2011-06-27 14:14:47 UTC (rev 10885)
@@ -20,17 +20,17 @@
public interface Connector
{
/**
- * starts the connector
+ * Starts the connector.
*/
void start();
/**
- * closes the connector
+ * Closes the connector.
*/
void close();
/**
- * returns true if the connector is started, oterwise false.
+ * Returns true if the connector is started, otherwise false.
*
* @return true if the connector is started
*/
@@ -39,9 +39,9 @@
/**
* Create and return a connection from this connector.
* <p/>
- * This method must NOT throw an exception if it fails to create the connection
- * (e.g. network is not available), in this case it MUST return null
- *
+ * This method must NOT throw an exception if it fails to create the connection (e.g. network is
+ * not available), in this case it MUST return {@code null}.
+ *
* @return The connection, or null if unable to create a connection (e.g. network is unavailable)
*/
Connection createConnection();
13 years, 6 months
JBoss hornetq SVN: r10884 - trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-27 09:41:35 -0400 (Mon, 27 Jun 2011)
New Revision: 10884
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
Document parameter name swap 'confusion'
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-27 13:41:00 UTC (rev 10883)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-27 13:41:35 UTC (rev 10884)
@@ -17,12 +17,16 @@
import java.lang.reflect.Array;
import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
@@ -33,7 +37,10 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
-import org.hornetq.core.config.*;
+import org.hornetq.core.config.BridgeConfiguration;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
@@ -54,7 +61,7 @@
* A ClusterManagerImpl
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 18 Nov 2008 09:23:49
*
*
@@ -92,18 +99,18 @@
// regular client listeners to be notified of cluster topology changes.
// they correspond to regular clients using a HA ServerLocator
- private Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
-
+ private final Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+
// cluster connections listeners to be notified of cluster topology changes
// they correspond to cluster connections on *other nodes connected to this one*
- private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+ private final Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
- private Topology topology = new Topology();
+ private final Topology topology = new Topology();
private volatile ServerLocatorInternal backupServerLocator;
private final List<ServerLocatorInternal> clusterLocators = new ArrayList<ServerLocatorInternal>();
- private Executor executor;
+ private final Executor executor;
public ClusterManagerImpl(final ExecutorFactory executorFactory,
final HornetQServer server,
@@ -173,7 +180,7 @@
{
announceNode();
}
-
+
started = true;
}
@@ -200,7 +207,7 @@
clusterConnection.stop();
managementService.unregisterCluster(clusterConnection.getName().toString());
}
-
+
clusterConnectionListeners.clear();
clientListeners.clear();
clusterConnections.clear();
@@ -238,7 +245,7 @@
}
boolean removed = topology.removeMember(nodeID);
-
+
if (removed)
{
@@ -254,6 +261,7 @@
}
}
+ // XXX Why is the interface's parameter 'backup' now 'last'?
public void notifyNodeUp(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last,
@@ -349,7 +357,7 @@
{
return topology;
}
-
+
// backup node becomes live
public synchronized void activate()
{
@@ -496,7 +504,7 @@
{
listener.nodeUP(nodeID, member.getConnector(), false);
}
-
+
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
listener.nodeUP(nodeID, member.getConnector(), false);
@@ -720,7 +728,7 @@
managementService.unregisterBridge(name);
}
}
-
+
private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
{
if (config.getName() == null)
@@ -816,7 +824,7 @@
clusterConnections.put(config.getName(), clusterConnection);
clusterConnection.start();
-
+
if(backup)
{
announceBackup(config, connector);
@@ -866,7 +874,7 @@
}
catch (Exception e)
{
- log.warn("Unable to announce backup", e);
+ log.warn("Unable to announce backup", e);
}
}
});
13 years, 6 months