JBoss hornetq SVN: r11380 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-21 05:52:24 -0400 (Wed, 21 Sep 2011)
New Revision: 11380
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
Log:
Add javadoc to page(...)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java 2011-09-20 23:57:01 UTC (rev 11379)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java 2011-09-21 09:52:24 UTC (rev 11380)
@@ -72,6 +72,11 @@
boolean page(ServerMessage message, RoutingContext ctx) throws Exception;
+ /**
+ * Write message to page if we are paging.
+ * @return {@code true} if we are paging and have handled the data, {@code false} if the data
+ * needs to be sent to the journal
+ */
boolean page(ServerMessage message, RoutingContext ctx, RouteContextList listCtx) throws Exception;
Page createPage(final int page) throws Exception;
13 years, 3 months
JBoss hornetq SVN: r11379 - in branches/Branch_2_2_EAP: src/main/org/hornetq/api/core/client and 12 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-20 19:57:01 -0400 (Tue, 20 Sep 2011)
New Revision: 11379
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/HornetQException.java
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java
branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
JBPAPP-7230 & JBPAPP-7229 - ClientID fixes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/HornetQException.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/HornetQException.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/HornetQException.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -135,6 +135,12 @@
public static final int DUPLICATE_ID_REJECTED = 113;
+ /**
+ * A Session Metadata was set in duplication
+ */
+ public static final int DUPLICATE_METADATA = 114;
+
+
// Native Error codes ----------------------------------------------
/**
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -569,6 +569,12 @@
void addMetaData(String key, String data) throws HornetQException;
/**
+ * Attach any metadata to the session. Throws an exception if there's already a metadata available
+ * @throws HornetQException
+ */
+ void addUniqueMetaData(String key, String data) throws HornetQException;
+
+ /**
* Attach any metadata to the session.
* Sends a Metadata using the older version
* @deprecated Use {@link ClientSession#addMetaData(String, String)}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -65,6 +65,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -1146,6 +1147,11 @@
}
channel.sendBlocking(new SessionAddMetaDataMessageV2(key, data));
}
+
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ channel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data));
+ }
public ClientSessionFactoryInternal getSessionFactory()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -593,4 +593,13 @@
return session.getChannel();
}
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#addUniqueMetaData(java.lang.String, java.lang.String)
+ */
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ session.addUniqueMetaData(key, data);
+
+ }
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -79,6 +79,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -490,6 +491,23 @@
session.addMetaData(message.getKey(), message.getData());
break;
}
+ case PacketImpl.SESS_UNIQUE_ADD_METADATA:
+ {
+ SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage)packet;
+ if (session.addUniqueMetaData(message.getKey(), message.getData()))
+ {
+ response = new NullResponseMessage();
+ }
+ else
+ {
+ response = new HornetQExceptionMessage(new HornetQException(HornetQException.DUPLICATE_METADATA,
+ "Metadata " + message.getKey() +
+ "=" +
+ message.getData() +
+ " had been set already"));
+ }
+ break;
+ }
}
}
catch (HornetQXAException e)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -44,6 +44,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_UNIQUE_ADD_METADATA;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
@@ -141,6 +142,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -538,6 +540,11 @@
packet = new SessionAddMetaDataMessageV2();
break;
}
+ case SESS_UNIQUE_ADD_METADATA:
+ {
+ packet = new SessionUniqueAddMetaDataMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -181,13 +181,17 @@
public static final byte REPLICATION_COMPARE_DATA = 102;
public static final byte REPLICATION_SYNC = 103;
-
- // HA
public static final byte SESS_ADD_METADATA = 104;
public static final byte SESS_ADD_METADATA2 = 105;
+ public static final byte SESS_UNIQUE_ADD_METADATA = 106;
+
+
+
+ // HA
+
public static final byte CLUSTER_TOPOLOGY = 110;
public static final byte NODE_ANNOUNCE = 111;
@@ -200,6 +204,7 @@
public static final byte CLUSTER_TOPOLOGY_V2 = 114;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -30,7 +30,7 @@
private String key;
private String data;
/**
- * It won require confirmation during failover / reconnect
+ * It's not required confirmation during failover / reconnect
*/
private boolean requiresConfirmation = true;
@@ -39,6 +39,11 @@
super(PacketImpl.SESS_ADD_METADATA2);
}
+ protected SessionAddMetaDataMessageV2(byte packetCode)
+ {
+ super(packetCode);
+ }
+
public SessionAddMetaDataMessageV2(String k, String d)
{
this();
@@ -46,6 +51,13 @@
data = d;
}
+ protected SessionAddMetaDataMessageV2(final byte packetCode, String k, String d)
+ {
+ super(packetCode);
+ key = k;
+ data = d;
+ }
+
public SessionAddMetaDataMessageV2(String k, String d, boolean requiresConfirmation)
{
this();
Added: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java (rev 0)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+
+/**
+ * A SessionUniqueAddMetaDataMessageV2
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class SessionUniqueAddMetaDataMessage extends SessionAddMetaDataMessageV2
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionUniqueAddMetaDataMessage()
+ {
+ super(SESS_UNIQUE_ADD_METADATA);
+ }
+
+
+ public SessionUniqueAddMetaDataMessage(String key, String data)
+ {
+ super(SESS_UNIQUE_ADD_METADATA, key, data);
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -65,7 +65,7 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(RemotingServiceImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
public static final long CONNECTION_TTL_CHECK_INTERVAL = 2000;
@@ -95,7 +95,7 @@
private final ScheduledExecutorService scheduledThreadPool;
private FailureCheckAndFlushThread failureCheckAndFlushThread;
-
+
private final ClusterManager clusterManager;
private Map<ProtocolType, ProtocolManager> protocolMap = new ConcurrentHashMap<ProtocolType, ProtocolManager>();
@@ -113,7 +113,7 @@
transportConfigs = config.getAcceptorConfigurations();
this.server = server;
-
+
this.clusterManager = clusterManager;
ClassLoader loader = Thread.currentThread().getContextClassLoader();
@@ -136,13 +136,13 @@
this.scheduledThreadPool = scheduledThreadPool;
- this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server,
- interceptors));
+ this.protocolMap.put(ProtocolType.CORE,
+ new CoreProtocolManagerFactory().createProtocolManager(server, interceptors));
// difference between Stomp and Stomp over Web Sockets is handled in NettyAcceptor.getPipeline()
- this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server,
- interceptors));
- this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManagerFactory().createProtocolManager(server,
- interceptors));
+ this.protocolMap.put(ProtocolType.STOMP,
+ new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
+ this.protocolMap.put(ProtocolType.STOMP_WS,
+ new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
}
// RemotingService implementation -------------------------------
@@ -168,9 +168,9 @@
// This needs to be a different thread pool to the main thread pool especially for OIO where we may need
// to support many hundreds of connections, but the main thread pool must be kept small for better performance
- ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads-" + server.toString() + "-" + System.identityHashCode(this),
- false,
- tccl);
+ ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads-" + server.toString() +
+ "-" +
+ System.identityHashCode(this), false, tccl);
threadPool = Executors.newCachedThreadPool(tFactory);
@@ -275,7 +275,6 @@
}
failureCheckAndFlushThread.close();
-
// We need to stop them accepting first so no new connections are accepted after we send the disconnect message
for (Acceptor acceptor : acceptors)
@@ -297,7 +296,7 @@
for (ConnectionEntry entry : connections.values())
{
RemotingConnection conn = entry.connection;
-
+
if (log.isTraceEnabled())
{
log.trace("Sending connection.disconnection packet to " + conn);
@@ -398,23 +397,23 @@
{
log.trace("Connection created " + connection);
}
-
+
connections.put(connection.getID(), entry);
if (config.isBackup())
{
serverSideReplicatingConnection = entry.connection;
- }
+ }
}
-
+
public void connectionDestroyed(final Object connectionID)
{
- if (isTrace)
- {
- log.trace("Connection removed " + connectionID + " from server " + this.server, new Exception ("trace"));
- }
-
+ if (isTrace)
+ {
+ log.trace("Connection removed " + connectionID + " from server " + this.server, new Exception("trace"));
+ }
+
ConnectionEntry conn = connections.get(connectionID);
if (conn != null)
@@ -459,7 +458,7 @@
// Connections should only fail when TTL is exceeded
}
-
+
public void connectionReadyForWrites(final Object connectionID, final boolean ready)
{
}
@@ -496,10 +495,10 @@
}
else
{
- if (log.isTraceEnabled())
- {
- log.trace("ConnectionID = " + connectionID + " was already closed, so ignoring packet");
- }
+ if (log.isTraceEnabled())
+ {
+ log.trace("ConnectionID = " + connectionID + " was already closed, so ignoring packet");
+ }
}
}
}
@@ -540,76 +539,85 @@
{
while (!closed)
{
- long now = System.currentTimeMillis();
+ try
+ {
+ long now = System.currentTimeMillis();
- Set<Object> idsToRemove = new HashSet<Object>();
+ Set<Object> idsToRemove = new HashSet<Object>();
- for (ConnectionEntry entry : connections.values())
- {
- RemotingConnection conn = entry.connection;
+ for (ConnectionEntry entry : connections.values())
+ {
+ RemotingConnection conn = entry.connection;
- boolean flush = true;
+ boolean flush = true;
- if (entry.ttl != -1)
- {
- if (now >= entry.lastCheck + entry.ttl)
+ if (entry.ttl != -1)
{
- if (!conn.checkDataReceived())
+ if (now >= entry.lastCheck + entry.ttl)
{
- idsToRemove.add(conn.getID());
+ if (!conn.checkDataReceived())
+ {
+ idsToRemove.add(conn.getID());
- flush = false;
+ flush = false;
+ }
+ else
+ {
+ entry.lastCheck = now;
+ }
}
- else
- {
- entry.lastCheck = now;
- }
}
+
+ if (flush)
+ {
+ conn.flush();
+ }
}
- if (flush)
+ for (Object id : idsToRemove)
{
- conn.flush();
+ RemotingConnection conn = removeConnection(id);
+ if (conn != null)
+ {
+ HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Did not receive data from " + conn.getRemoteAddress() +
+ ". It is likely the client has exited or crashed without " +
+ "closing its connection, or the network between the server and client has failed. " +
+ "You also might have configured connection-ttl and client-failure-check-period incorrectly. " +
+ "Please check user manual for more information." +
+ " The connection will now be closed.");
+ conn.fail(me);
+ }
}
- }
- for (Object id : idsToRemove)
- {
- RemotingConnection conn = removeConnection(id);
+ synchronized (this)
+ {
+ long toWait = pauseInterval;
- HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not receive data from " + conn.getRemoteAddress() +
- ". It is likely the client has exited or crashed without " +
- "closing its connection, or the network between the server and client has failed. " +
- "You also might have configured connection-ttl and client-failure-check-period incorrectly. " +
- "Please check user manual for more information." +
- " The connection will now be closed.");
- conn.fail(me);
- }
+ long start = System.currentTimeMillis();
- synchronized (this)
- {
- long toWait = pauseInterval;
-
- long start = System.currentTimeMillis();
-
- while (!closed && toWait > 0)
- {
- try
+ while (!closed && toWait > 0)
{
- wait(toWait);
- }
- catch (InterruptedException e)
- {
- }
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
- now = System.currentTimeMillis();
+ now = System.currentTimeMillis();
- toWait -= now - start;
+ toWait -= now - start;
- start = now;
+ start = now;
+ }
}
}
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -132,6 +132,9 @@
List<ServerSession> getSessions(String connectionID);
+ /** will return true if there is any session wth this key */
+ boolean lookupSession(String metakey, String metavalue);
+
ClusterManager getClusterManager();
SimpleString getNodeID();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -121,6 +121,8 @@
void addMetaData(String key, String data);
+ boolean addUniqueMetaData(String key, String data);
+
String getMetaData(String key);
String[] getTargetAddresses();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -797,6 +797,23 @@
sessions.remove(name);
}
+ public boolean lookupSession(String key, String value)
+ {
+ // getSessions is called here in a try to minimize locking the Server while this check is being done
+ Set<ServerSession> allSessions = getSessions();
+
+ for (ServerSession session : allSessions)
+ {
+ String metaValue = session.getMetaData(key);
+ if (metaValue != null && metaValue.equals(value))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public synchronized List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -1194,6 +1194,21 @@
metaData.put(key, data);
}
+
+ public boolean addUniqueMetaData(String key, String data)
+ {
+ if (server.lookupSession(key, data))
+ {
+ // There is a duplication of this property
+ return false;
+ }
+ else
+ {
+ addMetaData(key, data);
+ return true;
+ }
+ }
+
public String getMetaData(String key)
{
String data = null;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -181,6 +181,18 @@
{
throw new IllegalStateException("setClientID can only be called directly after the connection is created");
}
+
+ try
+ {
+ initialSession.addUniqueMetaData("jms-client-id", clientID);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.DUPLICATE_METADATA)
+ {
+ throw new IllegalStateException("clientID=" + clientID + " was already set into another connection");
+ }
+ }
this.clientID = clientID;
try
Modified: branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -88,8 +88,21 @@
connection.setClientID(clientID);
ProxyAssertSupport.assertEquals(clientID, connection.getClientID());
+
+ Connection connection2 = JMSTest.cf.createConnection();
+ try
+ {
+ connection2.setClientID(clientID);
+ fail("setClientID was expected to throw an exception");
+ }
+ catch (JMSException e)
+ {
+ // expected
+ }
connection.close();
+
+ connection2.setClientID(clientID);
}
public void testSetClientAfterStart() throws Exception
Modified: branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -1417,6 +1417,15 @@
// TODO Auto-generated method stub
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#addUniqueMetaData(java.lang.String, java.lang.String)
+ */
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -1666,8 +1666,8 @@
null,
groupAddress,
port,
- 5000,
- 5000);
+ 1000,
+ 1000);
configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-09-20 16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-09-20 23:57:01 UTC (rev 11379)
@@ -420,7 +420,7 @@
locator.close();
}
-
+
public void testMultipleClientSessionFactories() throws Throwable
{
startServers(0, 1, 2, 3, 4);
13 years, 3 months
JBoss hornetq SVN: r11378 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-20 12:40:18 -0400 (Tue, 20 Sep 2011)
New Revision: 11378
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
quick fix
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-20 15:46:55 UTC (rev 11377)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-20 16:40:18 UTC (rev 11378)
@@ -165,7 +165,7 @@
private Executor startExecutor;
private static ScheduledExecutorService globalScheduledThreadPool;
-
+
private AfterConnectInternalListener afterConnectListener;
private String groupID;
@@ -549,7 +549,7 @@
}
});
}
-
+
public Executor getExecutor()
{
return startExecutor;
@@ -592,7 +592,7 @@
{
return afterConnectListener;
}
-
+
public boolean isClosed()
{
return closed || closing;
@@ -1115,7 +1115,7 @@
{
return identity;
}
-
+
public void setIdentity(String identity)
{
this.identity = identity;
@@ -1287,15 +1287,17 @@
{
log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
}
-
+
if (topology.removeMember(eventTime, nodeID))
{
if (topology.isEmpty())
{
// Resetting the topology to its original condition as it was brand new
- topologyArray = null;
-
- receivedTopology = false;
+ synchronized (this)
+ {
+ topologyArray = null;
+ receivedTopology = false;
+ }
}
else
{
@@ -1406,7 +1408,7 @@
for (DiscoveryEntry entry : newConnectors)
{
this.initialConnectors[count++] = entry.getConnector();
-
+
if (topology != null && topology.getMember(entry.getNodeID()) == null)
{
TopologyMember member = new TopologyMember(entry.getConnector(), null);
13 years, 3 months
JBoss hornetq SVN: r11377 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-20 11:46:55 -0400 (Tue, 20 Sep 2011)
New Revision: 11377
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
fixing tests
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-20 15:11:34 UTC (rev 11376)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-20 15:46:55 UTC (rev 11377)
@@ -106,7 +106,7 @@
}
- private static final long TIMEOUT_START_SERVER = 10;
+ private static final long TIMEOUT_START_SERVER = 400;
@Override
protected void setUp() throws Exception
13 years, 3 months
JBoss hornetq SVN: r11376 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-20 11:11:34 -0400 (Tue, 20 Sep 2011)
New Revision: 11376
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
fixing tests
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-20 14:43:22 UTC (rev 11375)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-20 15:11:34 UTC (rev 11376)
@@ -1288,7 +1288,7 @@
log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
}
- if (!(isClusterConnection() && nodeID.equals(this.getNodeID())) && topology.removeMember(eventTime, nodeID))
+ if (topology.removeMember(eventTime, nodeID))
{
if (topology.isEmpty())
{
13 years, 3 months
JBoss hornetq SVN: r11375 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11 and 2 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-09-20 10:43:22 -0400 (Tue, 20 Sep 2011)
New Revision: 11375
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
more tests
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-20 11:53:56 UTC (rev 11374)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-20 14:43:22 UTC (rev 11375)
@@ -410,6 +410,14 @@
response.setNeedsDisconnect(true);
}
}
+ else
+ {
+ //request null, disconnect if so.
+ if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+ {
+ this.connection.disconnect();
+ }
+ }
return response;
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-20 11:53:56 UTC (rev 11374)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-20 14:43:22 UTC (rev 11375)
@@ -180,6 +180,14 @@
response.setNeedsDisconnect(true);
}
}
+ else
+ {
+ //request null, disconnect if so.
+ if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+ {
+ this.connection.disconnect();
+ }
+ }
return response;
}
@@ -461,7 +469,7 @@
if (reply.needsDisconnect())
{
- connection.destroy();
+ connection.disconnect();
}
else
{
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-20 11:53:56 UTC (rev 11374)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-20 14:43:22 UTC (rev 11375)
@@ -37,7 +37,10 @@
protected static final String LOGIN_HEADER = "login";
protected static final String PASSCODE_HEADER = "passcode";
+ //ext
+ protected static final String CLIENT_ID_HEADER = "client-id";
+
protected String version;
protected String host;
protected int port;
@@ -193,6 +196,21 @@
connect(null, null);
}
+ public void destroy()
+ {
+ try
+ {
+ close();
+ }
+ catch (IOException e)
+ {
+ }
+ finally
+ {
+ this.connected = false;
+ }
+ }
+
public void connect(String username, String password) throws Exception
{
throw new RuntimeException("connect method not implemented!");
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-20 11:53:56 UTC (rev 11374)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-20 14:43:22 UTC (rev 11375)
@@ -32,6 +32,8 @@
void connect(String defUser, String defPass) throws Exception;
+ void connect(String defUser, String defPass, String clientId) throws Exception;
+
boolean isConnected();
String getVersion();
@@ -44,6 +46,8 @@
void startPinger(long interval);
void stopPinger();
+
+ void destroy();
}
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-20 11:53:56 UTC (rev 11374)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-20 14:43:22 UTC (rev 11375)
@@ -47,6 +47,26 @@
}
}
+ public void connect(String username, String passcode, String clientID) throws IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
+ frame.addHeader(LOGIN_HEADER, username);
+ frame.addHeader(PASSCODE_HEADER, passcode);
+ frame.addHeader(CLIENT_ID_HEADER, clientID);
+
+ ClientStompFrame response = this.sendFrame(frame);
+
+ if (response.getCommand().equals(CONNECTED_COMMAND))
+ {
+ connected = true;
+ }
+ else
+ {
+ System.out.println("Connection failed with: " + response);
+ connected = false;
+ }
+ }
+
@Override
public void disconnect() throws IOException, InterruptedException
{
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-20 11:53:56 UTC (rev 11374)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-20 14:43:22 UTC (rev 11375)
@@ -62,7 +62,37 @@
connected = false;
}
}
+
+ public void connect(String username, String passcode, String clientID) throws IOException, InterruptedException
+ {
+ ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
+ frame.addHeader(ACCEPT_HEADER, "1.1");
+ frame.addHeader(HOST_HEADER, "localhost");
+ frame.addHeader(CLIENT_ID_HEADER, clientID);
+
+ if (username != null)
+ {
+ frame.addHeader(LOGIN_HEADER, username);
+ frame.addHeader(PASSCODE_HEADER, passcode);
+ }
+ ClientStompFrame response = this.sendFrame(frame);
+
+ if (response.getCommand().equals(CONNECTED_COMMAND))
+ {
+ String version = response.getHeader(VERSION_HEADER);
+ assert(version.equals("1.1"));
+
+ this.username = username;
+ this.passcode = passcode;
+ this.connected = true;
+ }
+ else
+ {
+ connected = false;
+ }
+ }
+
public void connect1(String username, String passcode) throws IOException, InterruptedException
{
ClientStompFrame frame = factory.newFrame(STOMP_COMMAND);
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-20 11:53:56 UTC (rev 11374)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-20 14:43:22 UTC (rev 11375)
@@ -18,6 +18,7 @@
package org.hornetq.tests.integration.stomp.v11;
import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -631,7 +632,7 @@
System.out.println("Received message with id " + messageID);
- ack(connV11, "sub1", messageID);
+ ack(connV11, "sub1", messageID, null);
unsubscribe(connV11, "sub1");
@@ -657,7 +658,7 @@
System.out.println("Received message with id " + messageID);
- ack(connV11, "sub2", messageID);
+ ack(connV11, "sub2", messageID, null);
ClientStompFrame error = connV11.receiveFrame();
@@ -687,7 +688,7 @@
System.out.println("Received message with id " + messageID);
- ack(connV11, "sub2", "someother");
+ ack(connV11, "sub2", "someother", null);
ClientStompFrame error = connV11.receiveFrame();
@@ -931,28 +932,197 @@
Assert.assertNull(message);
}
- private void ack(StompClientConnection connV112, String subId,
+ //tests below are adapted from StompTest
+ public void testBeginSameTransactionTwice() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ beginTransaction(connV11, "tx1");
+
+ beginTransaction(connV11, "tx1");
+
+ ClientStompFrame f = connV11.receiveFrame();
+ Assert.assertTrue(f.getCommand().equals("ERROR"));
+ }
+
+ public void testBodyWithUTF8() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, getName(), "auto");
+
+ String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
+ System.out.println(text);
+ sendMessage(text);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+ System.out.println(frame);
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertTrue(frame.getBody().equals(text));
+
+ connV11.disconnect();
+ }
+
+ public void testClientAckNotPartOfTransaction() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, getName(), "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertTrue(frame.getBody().equals(getName()));
+ Assert.assertNotNull(frame.getHeader("message-id"));
+
+ String messageID = frame.getHeader("message-id");
+
+ beginTransaction(connV11, "tx1");
+
+ this.ack(connV11, getName(), messageID, "tx1");
+
+ abortTransaction(connV11, "tx1");
+
+ frame = connV11.receiveFrame();
+
+ assertNull(frame);
+
+ this.unsubscribe(connV11, getName());
+
+ connV11.disconnect();
+ }
+
+ public void testDisconnectAndError() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, getName(), "client");
+
+ ClientStompFrame frame = connV11.createFrame("DISCONNECT");
+ frame.addHeader("receipt", "1");
+
+ ClientStompFrame result = connV11.sendFrame(frame);
+
+ if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
+ {
+ fail("Disconnect failed! " + result);
+ }
+
+ // sending a message will result in an error
+ ClientStompFrame sendFrame = connV11.createFrame("SEND");
+ sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ sendFrame.setBody("Hello World");
+
+ try
+ {
+ connV11.sendFrame(sendFrame);
+ fail("connection should have been closed by server.");
+ }
+ catch (ClosedChannelException e)
+ {
+ //ok.
+ }
+
+ connV11.destroy();
+ }
+
+ public void testDurableSubscriber() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "client", getName());
+
+ this.subscribe(connV11, "sub1", "client", getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals("ERROR"));
+
+ connV11.disconnect();
+ }
+
+ public void testDurableSubscriberWithReconnection() throws Exception
+ {
+ connV11.connect(defUser, defPass, "myclientid");
+
+ this.subscribeTopic(connV11, "sub1", "auto", getName());
+
+ ClientStompFrame frame = connV11.createFrame("DISCONNECT");
+ frame.addHeader("receipt", "1");
+
+ ClientStompFrame result = connV11.sendFrame(frame);
+
+ if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
+ {
+ fail("Disconnect failed! " + result);
+ }
+
+ // send the message when the durable subscriber is disconnected
+ sendMessage(getName(), topic);
+
+ connV11.destroy();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ connV11.connect(defUser, defPass, "myclientid");
+
+ this.subscribeTopic(connV11, "sub1", "auto", getName());
+
+ // we must have received the message
+ frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertEquals(getName(), frame.getBody());
+
+ this.unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+ }
+
+ private void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
+ {
+ ClientStompFrame abortFrame = conn.createFrame("ABORT");
+ abortFrame.addHeader("transaction", txID);
+
+ conn.sendFrame(abortFrame);
+ }
+
+ private void beginTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
+ {
+ ClientStompFrame beginFrame = conn.createFrame("BEGIN");
+ beginFrame.addHeader("transaction", txID);
+
+ conn.sendFrame(beginFrame);
+ }
+
+ private void ack(StompClientConnection conn, String subId,
ClientStompFrame frame) throws IOException, InterruptedException
{
String messageID = frame.getHeader("message-id");
- ClientStompFrame ackFrame = connV11.createFrame("ACK");
+ ClientStompFrame ackFrame = conn.createFrame("ACK");
ackFrame.addHeader("subscription", subId);
ackFrame.addHeader("message-id", messageID);
- ClientStompFrame response = connV11.sendFrame(ackFrame);
+ ClientStompFrame response = conn.sendFrame(ackFrame);
if (response != null)
{
throw new IOException("failed to ack " + response);
}
}
- private void ack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
+ private void ack(StompClientConnection conn, String subId, String mid, String txID) throws IOException, InterruptedException
{
ClientStompFrame ackFrame = conn.createFrame("ACK");
ackFrame.addHeader("subscription", subId);
ackFrame.addHeader("message-id", mid);
+ if (txID != null)
+ {
+ ackFrame.addHeader("transaction", txID);
+ }
conn.sendFrame(ackFrame);
}
@@ -976,6 +1146,30 @@
conn.sendFrame(subFrame);
}
+ private void subscribe(StompClientConnection conn, String subId,
+ String ack, String durableId) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", subId);
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", ack);
+ subFrame.addHeader("durable-subscriber-name", durableId);
+
+ conn.sendFrame(subFrame);
+ }
+
+ private void subscribeTopic(StompClientConnection conn, String subId,
+ String ack, String durableId) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", subId);
+ subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
+ subFrame.addHeader("ack", ack);
+ subFrame.addHeader("durable-subscriber-name", durableId);
+
+ conn.sendFrame(subFrame);
+ }
+
private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException
{
ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
13 years, 3 months
JBoss hornetq SVN: r11374 - in branches/HORNETQ-720_Replication: tests/integration-tests/src/test/java/org/hornetq/tests/integration/client and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-09-20 07:53:56 -0400 (Tue, 20 Sep 2011)
New Revision: 11374
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/TestSupportPageStore.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Delete TestSupportPageStore which serves no purpose.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-20 01:20:26 UTC (rev 11373)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-20 11:53:56 UTC (rev 11374)
@@ -67,7 +67,7 @@
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*/
-public class PagingStoreImpl implements TestSupportPageStore
+public class PagingStoreImpl implements PagingStore
{
// Constants -----------------------------------------------------
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/TestSupportPageStore.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/TestSupportPageStore.java 2011-09-20 01:20:26 UTC (rev 11373)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/TestSupportPageStore.java 2011-09-20 11:53:56 UTC (rev 11374)
@@ -1,25 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.paging.impl;
-
-import org.hornetq.core.paging.PagingStore;
-
-/**
- * All the methods required to TestCases on PageStoreImpl
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- *
- */
-public interface TestSupportPageStore extends PagingStore
-{
-}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-09-20 01:20:26 UTC (rev 11373)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2011-09-20 11:53:56 UTC (rev 11374)
@@ -55,7 +55,6 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
-import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
@@ -70,7 +69,7 @@
* A PagingTest
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
+ *
* Created Dec 5, 2008 8:25:58 PM
*
*
@@ -967,6 +966,7 @@
threads[start - 1] = new Thread()
{
+ @Override
public void run()
{
try
@@ -1214,7 +1214,7 @@
* - Consume the entire destination (not in page mode any more)
* - Add stuff to a transaction again
* - Check order
- *
+ *
*/
public void testDepageDuringTransaction() throws Exception
{
@@ -1355,9 +1355,9 @@
* - Consume the entire destination (not in page mode any more)
* - Add stuff to a transaction again
* - Check order
- *
+ *
* Test under discussion at : http://community.jboss.org/thread/154061?tstart=0
- *
+ *
*/
public void testDepageDuringTransaction2() throws Exception
{
@@ -1680,6 +1680,7 @@
Thread producerThread = new Thread()
{
+ @Override
public void run()
{
ClientSession sessionProducer = null;
@@ -1808,6 +1809,7 @@
Thread producerThread = new Thread()
{
+ @Override
public void run()
{
ClientSession sessionProducer = null;
@@ -1962,7 +1964,7 @@
message.getBodyBuffer().writeBytes(body);
message.putIntProperty(new SimpleString("id"), i);
- TestSupportPageStore store = (TestSupportPageStore)server.getPostOffice()
+ PagingStore store = server.getPostOffice()
.getPagingManager()
.getPageStore(PagingTest.ADDRESS);
@@ -2806,7 +2808,7 @@
catch (Throwable ignored)
{
}
-
+
OperationContextImpl.clearContext();
}
@@ -3181,6 +3183,7 @@
Thread consumeThread = new Thread()
{
+ @Override
public void run()
{
ClientSession sessionConsumer = null;
@@ -3584,7 +3587,7 @@
message.setBodyInputStream(createFakeLargeStream(messageSize));
producer.send(message);
-
+
if ((i + 1) % 2 == 0)
{
session.commit();
@@ -3598,27 +3601,27 @@
ClientConsumer cons = session.createConsumer(ADDRESS);
ClientMessage msg = null;
-
+
for (int msgNr = 0 ; msgNr < 2; msgNr++)
{
for (int i = 0 ; i < 5; i++)
{
msg = cons.receive(5000);
-
+
assertNotNull(msg);
-
+
msg.acknowledge();
-
+
assertEquals("str" + msgNr, msg.getStringProperty("id"));
-
+
for (int j = 0; j < messageSize; j++)
{
assertEquals(getSamplebyte(j), msg.getBodyBuffer().readByte());
}
-
+
session.rollback();
}
-
+
pgStoreDLA.startPaging();
}
@@ -3639,27 +3642,27 @@
});
}
-
+
assertNull(cons.receiveImmediate());
cons.close();
-
+
sf.close();
-
+
locator.close();
-
+
server.stop();
-
+
server.start();
-
+
locator = createInVMNonHALocator();
-
+
sf = locator.createSessionFactory();
-
+
session = sf.createSession(false, false);
-
+
session.start();
-
+
cons = session.createConsumer(ADDRESS);
for (int i = 2; i < 100; i++)
@@ -3678,9 +3681,9 @@
}
});
}
-
+
cons.close();
-
+
cons = session.createConsumer("DLA");
for (int msgNr = 0 ; msgNr < 2; msgNr++)
@@ -3688,38 +3691,38 @@
msg = cons.receive(5000);
assertNotNull(msg);
-
+
assertEquals("str" + msgNr, msg.getStringProperty("id"));
for (int i = 0; i < messageSize; i++)
{
assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
}
-
+
msg.acknowledge();
}
-
+
cons.close();
-
+
cons = session.createConsumer(ADDRESS);
-
+
session.commit();
-
+
assertNull(cons.receiveImmediate());
-
+
long timeout = System.currentTimeMillis() + 5000;
-
+
pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
-
+
pgStoreAddress.getCursorProvider().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
-
+
pgStoreAddress.getCursorProvider().cleanup();
-
+
while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
{
Thread.sleep(50);
}
-
+
assertFalse(pgStoreAddress.isPaging());
session.commit();
@@ -3791,7 +3794,7 @@
message = session.createMessage(true);
message.putStringProperty("id", "str" + i);
-
+
message.setExpiration(System.currentTimeMillis() + 2000);
if (i % 2 == 0)
@@ -3809,7 +3812,7 @@
}
producer.send(message);
-
+
if ((i + 1) % 2 == 0)
{
session.commit();
@@ -3821,30 +3824,30 @@
}
session.commit();
-
+
sf.close();
-
+
locator.close();
-
+
server.stop();
-
+
Thread.sleep(3000);
-
+
server.start();
-
+
locator = createInVMNonHALocator();
-
+
sf = locator.createSessionFactory();
-
+
session = sf.createSession(false, false);
-
+
session.start();
-
+
ClientConsumer consAddr = session.createConsumer(ADDRESS);
-
+
assertNull(consAddr.receive(1000));
-
-
+
+
ClientConsumer cons = session.createConsumer("DLA");
for (int i = 0; i < 500; i++)
@@ -3863,22 +3866,22 @@
}
});
}
-
+
assertNull(cons.receiveImmediate());
-
+
session.commit();
-
+
cons.close();
-
+
long timeout = System.currentTimeMillis() + 5000;
-
+
pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
-
+
while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging())
{
Thread.sleep(50);
}
-
+
assertFalse(pgStoreAddress.isPaging());
session.close();
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2011-09-20 01:20:26 UTC (rev 11373)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2011-09-20 11:53:56 UTC (rev 11374)
@@ -23,9 +23,9 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
-import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.RoutingContextImpl;
@@ -40,7 +40,7 @@
import org.hornetq.utils.OrderedExecutorFactory;
/**
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
@@ -64,13 +64,13 @@
AddressSettings settings = new AddressSettings();
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setDefault(settings);
-
-
+
+
PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(getPageDir(),
100, null,
new OrderedExecutorFactory(Executors.newCachedThreadPool()),
true);
-
+
storeFactory.setPostOffice(new FakePostOffice());
PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory,
@@ -79,7 +79,7 @@
managerImpl.start();
- TestSupportPageStore store = (TestSupportPageStore)managerImpl.getPageStore(new SimpleString("simple-test"));
+ PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-20 01:20:26 UTC (rev 11373)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-20 11:53:56 UTC (rev 11374)
@@ -40,7 +40,6 @@
import org.hornetq.core.paging.PagingStoreFactory;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
-import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.PostOffice;
@@ -153,7 +152,8 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ PagingStore storeImpl =
+ new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
null,
100,
createMockManager(),
@@ -218,7 +218,8 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ PagingStoreImpl storeImpl =
+ new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
null,
100,
createMockManager(),
@@ -294,7 +295,8 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ PagingStore storeImpl =
+ new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
null,
100,
createMockManager(),
@@ -442,7 +444,8 @@
settings.setPageSizeBytes(MAX_SIZE);
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ final PagingStore storeImpl =
+ new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
null,
100,
createMockManager(),
@@ -606,7 +609,8 @@
fileTmp.close();
}
- TestSupportPageStore storeImpl2 = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ PagingStore storeImpl2 =
+ new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
null,
100,
createMockManager(),
@@ -692,7 +696,8 @@
settings.setPageSizeBytes(MAX_SIZE);
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ final PagingStore storeImpl =
+ new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
null,
100,
createMockManager(),
@@ -735,7 +740,8 @@
settings.setPageSizeBytes(MAX_SIZE);
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+ final PagingStore storeImpl =
+ new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
null,
100,
createMockManager(),
13 years, 3 months
JBoss hornetq SVN: r11373 - branches.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2011-09-19 21:20:26 -0400 (Mon, 19 Sep 2011)
New Revision: 11373
Added:
branches/HORNETQ-316_for_2_2_EAP/
Log:
HORNETQ-316 Branch based on Branch_2_2_EAP
13 years, 3 months
JBoss hornetq SVN: r11372 - branches.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2011-09-19 21:00:48 -0400 (Mon, 19 Sep 2011)
New Revision: 11372
Added:
branches/HORNETQ-316_for-r11196/
Removed:
branches/HORNETQ-316/
Log:
backup
13 years, 3 months
JBoss hornetq SVN: r11371 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/protocol/core/impl and 15 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-09-19 20:50:32 -0400 (Mon, 19 Sep 2011)
New Revision: 11371
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
Refactoring clustering manager / cluster connection
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -53,6 +53,7 @@
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -367,7 +368,7 @@
// ConnectionLifeCycleListener implementation --------------------------------------------------
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -1111,6 +1111,11 @@
}
}
+ public String getIdentity()
+ {
+ return identity;
+ }
+
public void setIdentity(String identity)
{
this.identity = identity;
@@ -1282,8 +1287,8 @@
{
log.debug("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
}
-
- if (topology.removeMember(eventTime, nodeID))
+
+ if (!(isClusterConnection() && nodeID.equals(this.getNodeID())) && topology.removeMember(eventTime, nodeID))
{
if (topology.isEmpty())
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -39,8 +39,12 @@
void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
- /** Used to better identify Cluster Connection Locators on logs while debugging logs */
+ /** Used to better identify Cluster Connection Locators on logs. To facilitate eventual debugging.
+ *
+ * This method used to be on tests interface, but I'm now making it part of the public interface since*/
void setIdentity(String identity);
+
+ String getIdentity();
void setNodeID(String nodeID);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -108,7 +108,7 @@
{
if (log.isDebugEnabled())
{
- log.info(this + "::Live node " + nodeId + "=" + memberInput);
+ log.debug(this + "::node " + nodeId + "=" + memberInput);
}
memberInput.setUniqueEventID(System.currentTimeMillis());
mapTopology.remove(nodeId);
@@ -212,7 +212,7 @@
currentMember +
", memberInput=" +
memberInput +
- "newMember=" + newMember);
+ "newMember=" + newMember, new Exception ("trace"));
}
@@ -301,7 +301,7 @@
{
if (member.getUniqueEventID() > uniqueEventID)
{
- log.info("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
+ log.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
member = null;
}
else
@@ -482,22 +482,17 @@
public synchronized String describe(final String text)
{
- String desc = text + "\n";
+ String desc = text + "topology on " + this + ":\n";
for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(mapTopology).entrySet())
{
desc += "\t" + entry.getKey() + " => " + entry.getValue() + "\n";
}
desc += "\t" + "nodes=" + nodes() + "\t" + "members=" + members();
- return desc;
- }
-
- public void clear()
- {
- if (Topology.log.isDebugEnabled())
+ if (mapTopology.isEmpty())
{
- Topology.log.debug(this + "::clear", new Exception("trace"));
+ desc += "\tEmpty";
}
- mapTopology.clear();
+ return desc;
}
public int members()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -42,6 +42,7 @@
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
/**
@@ -68,7 +69,7 @@
this.interceptors = interceptors;
}
- public ConnectionEntry createConnectionEntry(final Connection connection)
+ public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection)
{
final Configuration config = server.getConfiguration();
@@ -177,16 +178,18 @@
};
final boolean isCC = msg.isClusterConnection();
-
- server.getClusterManager().addClusterTopologyListener(listener, isCC);
-
- rc.addCloseListener(new CloseListener()
+ if (acceptorUsed.getClusterConnection() != null)
{
- public void connectionClosed()
+ acceptorUsed.getClusterConnection().addClusterTopologyListener(listener, isCC);
+
+ rc.addCloseListener(new CloseListener()
{
- server.getClusterManager().removeClusterTopologyListener(listener, isCC);
- }
- });
+ public void connectionClosed()
+ {
+ acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener, isCC);
+ }
+ });
+ }
}
else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
{
@@ -205,7 +208,8 @@
{
log.trace("Server " + server + " receiving nodeUp from NodeID=" + msg.getNodeID() + ", pair=" + pair);
}
- server.getClusterManager().nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), pair, msg.isBackup());
+
+ acceptorUsed.getClusterConnection().nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), pair, msg.isBackup());
}
}
});
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -38,6 +38,7 @@
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.utils.UUIDGenerator;
@@ -109,7 +110,7 @@
// ProtocolManager implementation --------------------------------
- public ConnectionEntry createConnectionEntry(final Connection connection)
+ public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection)
{
StompConnection conn = new StompConnection(connection, this);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -54,16 +55,21 @@
private volatile boolean started;
private final ExecutorFactory executorFactory;
+
+ private final ClusterConnection clusterConnection;
private boolean paused;
private NotificationService notificationService;
- public InVMAcceptor(final Map<String, Object> configuration,
+ public InVMAcceptor(final ClusterConnection clusterConnection,
+ final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
final Executor threadPool)
{
+ this.clusterConnection = clusterConnection;
+
this.handler = handler;
this.listener = listener;
@@ -73,6 +79,11 @@
executorFactory = new OrderedExecutorFactory(threadPool);
}
+ public ClusterConnection getClusterConnection()
+ {
+ return clusterConnection;
+ }
+
public synchronized void start() throws Exception
{
if (started)
@@ -189,7 +200,7 @@
throw new IllegalStateException("Acceptor is not started");
}
- new InVMConnection(id, connectionID, remoteHandler, new Listener(connector), clientExecutor);
+ new InVMConnection(this, id, connectionID, remoteHandler, new Listener(connector), clientExecutor);
}
public void disconnect(final String connectionID)
@@ -209,6 +220,8 @@
private class Listener implements ConnectionLifeCycleListener
{
+ //private static Listener instance = new Listener();
+
private final InVMConnector connector;
Listener(final InVMConnector connector)
@@ -216,14 +229,14 @@
this.connector = connector;
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent((String)connection.getID(), connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
}
- listener.connectionCreated(connection, protocol);
+ listener.connectionCreated(acceptor, connection, protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -17,6 +17,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -31,14 +32,15 @@
*/
public class InVMAcceptorFactory implements AcceptorFactory
{
- public Acceptor createAcceptor(final Map<String, Object> configuration,
+ public Acceptor createAcceptor(final ClusterConnection clusterConnection,
+ final Map<String, Object> configuration,
final BufferHandler handler,
final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new InVMAcceptor(configuration, handler, listener, threadPool);
+ return new InVMAcceptor(clusterConnection, configuration, handler, listener, threadPool);
}
public Set<String> getAllowableProperties()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -57,15 +58,17 @@
private volatile boolean closing;
- public InVMConnection(final int serverID,
+ public InVMConnection(final Acceptor acceptor,
+ final int serverID,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
final Executor executor)
{
- this(serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener, executor);
+ this(acceptor, serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener, executor);
}
- public InVMConnection(final int serverID,
+ public InVMConnection(final Acceptor acceptor,
+ final int serverID,
final String id,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
@@ -81,7 +84,7 @@
this.executor = executor;
- listener.connectionCreated(this, ProtocolType.CORE);
+ listener.connectionCreated(acceptor, this, ProtocolType.CORE);
}
public void close()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -172,19 +172,20 @@
final ConnectionLifeCycleListener listener,
final Executor serverExecutor)
{
- return new InVMConnection(id, handler, listener, serverExecutor);
+ // No acceptor on a client connection
+ return new InVMConnection(null, id, handler, listener, serverExecutor);
}
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent((String)connection.getID(), connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
}
- listener.connectionCreated(connection, protocol);
+ listener.connectionCreated(acceptor, connection, protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -37,6 +37,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.WebSocketServerHandler;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -87,6 +88,8 @@
{
static final Logger log = Logger.getLogger(NettyAcceptor.class);
+ private ClusterConnection clusterConnection;
+
private ChannelFactory channelFactory;
private volatile ChannelGroup serverChannelGroup;
@@ -158,6 +161,7 @@
private final long batchDelay;
private final boolean directDeliver;
+
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
@@ -166,6 +170,21 @@
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
+ this(null, configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
+ }
+
+
+ public NettyAcceptor(final ClusterConnection clusterConnection,
+ final Map<String, Object> configuration,
+ final BufferHandler handler,
+ final BufferDecoder decoder,
+ final ConnectionLifeCycleListener listener,
+ final Executor threadPool,
+ final ScheduledExecutorService scheduledThreadPool)
+ {
+
+ this.clusterConnection = clusterConnection;
+
this.handler = handler;
this.decoder = decoder;
@@ -618,6 +637,14 @@
{
this.notificationService = notificationService;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.Acceptor#getClusterConnection()
+ */
+ public ClusterConnection getClusterConnection()
+ {
+ return clusterConnection;
+ }
// Inner classes -----------------------------------------------------------------------------
@@ -633,7 +660,7 @@
@Override
public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
{
- new NettyConnection(e.getChannel(), new Listener(), !httpEnabled && batchDelay > 0, directDeliver);
+ new NettyConnection(NettyAcceptor.this, e.getChannel(), new Listener(), !httpEnabled && batchDelay > 0, directDeliver);
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
if (sslHandler != null)
@@ -662,14 +689,14 @@
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent(connection.getID(), (NettyConnection)connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
}
- listener.connectionCreated(connection, NettyAcceptor.this.protocol);
+ listener.connectionCreated(acceptor, connection, NettyAcceptor.this.protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -18,6 +18,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -31,14 +32,15 @@
*/
public class NettyAcceptorFactory implements AcceptorFactory
{
- public Acceptor createAcceptor(final Map<String, Object> configuration,
+ public Acceptor createAcceptor(final ClusterConnection connection,
+ final Map<String, Object> configuration,
final BufferHandler handler,
final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new NettyAcceptor(configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
+ return new NettyAcceptor(connection, configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
}
public Set<String> getAllowableProperties()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -21,6 +21,7 @@
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.spi.core.remoting.ReadyListener;
@@ -72,6 +73,15 @@
boolean batchingEnabled,
boolean directDeliver)
{
+ this(null, channel, listener, batchingEnabled, directDeliver);
+ }
+
+ public NettyConnection(final Acceptor acceptor,
+ final Channel channel,
+ final ConnectionLifeCycleListener listener,
+ boolean batchingEnabled,
+ boolean directDeliver)
+ {
this.channel = channel;
this.listener = listener;
@@ -80,7 +90,7 @@
this.directDeliver = directDeliver;
- listener.connectionCreated(this, ProtocolType.CORE);
+ listener.connectionCreated(acceptor, this, ProtocolType.CORE);
}
// Public --------------------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -35,6 +35,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -495,7 +496,8 @@
ch.getPipeline().get(HornetQChannelHandler.class).active = true;
}
- NettyConnection conn = new NettyConnection(ch, new Listener(), !httpEnabled && batchDelay > 0, false);
+ // No acceptor on a client connection
+ NettyConnection conn = new NettyConnection(null, ch, new Listener(), !httpEnabled && batchDelay > 0, false);
return conn;
}
@@ -689,7 +691,7 @@
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent(connection.getID(), connection) != null)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -39,6 +39,7 @@
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.impl.ServerSessionImpl;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.spi.core.protocol.ConnectionEntry;
@@ -94,6 +95,8 @@
private final ScheduledExecutorService scheduledThreadPool;
private FailureCheckAndFlushThread failureCheckAndFlushThread;
+
+ private final ClusterManager clusterManager;
private Map<ProtocolType, ProtocolManager> protocolMap = new ConcurrentHashMap<ProtocolType, ProtocolManager>();
@@ -101,7 +104,8 @@
// Constructors --------------------------------------------------
- public RemotingServiceImpl(final Configuration config,
+ public RemotingServiceImpl(final ClusterManager clusterManager,
+ final Configuration config,
final HornetQServer server,
final ManagementService managementService,
final ScheduledExecutorService scheduledThreadPool)
@@ -109,6 +113,8 @@
transportConfigs = config.getAcceptorConfigurations();
this.server = server;
+
+ this.clusterManager = clusterManager;
ClassLoader loader = Thread.currentThread().getContextClassLoader();
for (String interceptorClass : config.getInterceptorClassNames())
@@ -202,7 +208,9 @@
ProtocolManager manager = protocolMap.get(protocol);
- Acceptor acceptor = factory.createAcceptor(info.getParams(),
+ // TODO: parameterize the cluster connection
+ Acceptor acceptor = factory.createAcceptor(clusterManager.getDefaultConnection(),
+ info.getParams(),
new DelegatingBufferHandler(),
manager,
this,
@@ -370,7 +378,7 @@
return protocolMap.get(protocol);
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
if (server == null)
{
@@ -384,7 +392,7 @@
throw new IllegalArgumentException("Unknown protocol " + protocol);
}
- ConnectionEntry entry = pmgr.createConnectionEntry(connection);
+ ConnectionEntry entry = pmgr.createConnectionEntry(acceptor, connection);
if (isTrace)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -15,9 +15,11 @@
import java.util.Map;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.HornetQServer;
@@ -37,7 +39,13 @@
String getNodeID();
HornetQServer getServer();
+
+ void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
+ void addClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
+
+ void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
+
/**
* @return a Map of node ID and addresses
*/
@@ -47,8 +55,14 @@
TransportConfiguration getConnector();
+ Topology getTopology();
+
void flushExecutor();
// for debug
String describe();
+
+ void informTopology();
+
+ void announceBackup();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -16,11 +16,7 @@
import java.util.Map;
import java.util.Set;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.server.HornetQComponent;
@@ -37,26 +33,26 @@
Map<String, Bridge> getBridges();
Set<ClusterConnection> getClusterConnections();
+
+ /**
+ * Return the default ClusterConnection to be used case it's not defined by the acceptor
+ * @return
+ */
+ ClusterConnection getDefaultConnection();
ClusterConnection getClusterConnection(SimpleString name);
Set<BroadcastGroup> getBroadcastGroups();
-
- void addClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
- void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
-
void activate();
- void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
-
- Topology getTopology();
-
void flushExecutor();
void announceBackup() throws Exception;
+
+ void deploy() throws Exception;
- void deployBridge(BridgeConfiguration config) throws Exception;
+ void deployBridge(BridgeConfiguration config, boolean start) throws Exception;
void destroyBridge(String name) throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -33,6 +33,8 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.AfterConnectInternalListener;
@@ -46,6 +48,7 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
@@ -79,8 +82,6 @@
private final ExecutorFactory executorFactory;
- private final Topology clusterManagerTopology;
-
private final Executor executor;
private final HornetQServer server;
@@ -140,9 +141,17 @@
private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
private final ClusterManagerInternal manager;
+
+
+ // Stuff that used to be on the ClusterManager
+
+ private final Topology topology = new Topology(this);
+
+ private volatile ServerLocatorInternal backupServerLocator;
+
+
public ClusterConnectionImpl(final ClusterManagerInternal manager,
- final Topology clusterManagerTopology,
final TransportConfiguration[] tcConfigs,
final TransportConfiguration connector,
final SimpleString name,
@@ -204,6 +213,8 @@
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
+
+ this.topology.setExecutor(executor);
this.server = server;
@@ -227,8 +238,6 @@
this.callTimeout = callTimeout;
- this.clusterManagerTopology = clusterManagerTopology;
-
clusterConnector = new StaticClusterConnector(tcConfigs);
if (tcConfigs != null && tcConfigs.length > 0)
@@ -244,7 +253,6 @@
}
public ClusterConnectionImpl(final ClusterManagerImpl manager,
- final Topology clusterManagerTopology,
DiscoveryGroupConfiguration dg,
final TransportConfiguration connector,
final SimpleString name,
@@ -308,6 +316,8 @@
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
+
+ this.topology.setExecutor(executor);
this.server = server;
@@ -330,11 +340,9 @@
clusterConnector = new DiscoveryClusterConnector(dg);
this.manager = manager;
-
- this.clusterManagerTopology = clusterManagerTopology;
}
- public void start() throws Exception
+ public void start() throws Exception
{
synchronized (this)
{
@@ -410,13 +418,21 @@
props);
managementService.sendNotification(notification);
}
+
+
executor.execute(new Runnable()
{
public void run()
{
synchronized (ClusterConnectionImpl.this)
{
+ if (backupServerLocator != null)
+ {
+ backupServerLocator.close();
+ backupServerLocator = null;
+ }
+
if (serverLocator != null)
{
serverLocator.close();
@@ -430,12 +446,97 @@
started = false;
}
+
+ public void announceBackup()
+ {
+ this.backupServerLocator = clusterConnector.createServerLocator(false);
+
+ backupServerLocator.setReconnectAttempts(-1);
+ backupServerLocator.setInitialConnectAttempts(-1);
+
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(ClusterConnectionImpl.this + ":: announcing " + connector + " to " + backupServerLocator);
+ }
+ ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
+ if (backupSessionFactory != null)
+ {
+ backupSessionFactory.getConnection()
+ .getChannel(0, -1)
+ .send(new NodeAnnounceMessage(System.currentTimeMillis(),
+ nodeUUID.toString(),
+ true,
+ connector,
+ null));
+ log.info("backup announced");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to announce backup, retrying", e);
+ }
+ }
+ });
+ }
+
+ private TopologyMember getLocalMember()
+ {
+ return topology.getMember(manager.getNodeId());
+ }
+
+ public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
+ {
+ topology.addClusterTopologyListener(listener);
+
+ // no need to use an executor here since the Topology is already using one
+ topology.sendTopology(listener);
+ }
+
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
+ {
+ topology.removeClusterTopologyListener(listener);
+ }
+
+ public Topology getTopology()
+ {
+ return topology;
+ }
+
+ public void nodeAnnounced(final long uniqueEventID,
+ final String nodeID,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean backup)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
+ }
+
+ TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
+ newMember.setUniqueEventID(uniqueEventID);
+ if (backup)
+ {
+ topology.updateBackup(nodeID, new TopologyMember(connectorPair.a, connectorPair.b));
+ }
+ else
+ {
+ topology.updateMember(uniqueEventID, nodeID, newMember);
+ }
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.client.impl.AfterConnectInternalListener#onConnection(org.hornetq.core.client.impl.ClientSessionFactoryInternal)
*/
public void onConnection(ClientSessionFactoryInternal sf)
{
- TopologyMember localMember = manager.getLocalMember();
+ TopologyMember localMember = getLocalMember();
sf.sendNodeAnnounce(localMember.getUniqueEventID(),
manager.getNodeId(),
false,
@@ -498,9 +599,27 @@
}
backup = false;
+
+ topology.updateAsLive(manager.getNodeId(), new TopologyMember(connector, null));
- serverLocator = clusterConnector.createServerLocator();
+ if (backupServerLocator != null)
+ {
+ // todo we could use the topology of this to preempt it arriving from the cc
+ try
+ {
+ backupServerLocator.close();
+ }
+ catch (Exception e)
+ {
+ log.warn("problem closing backup session factory", e);
+ }
+ backupServerLocator = null;
+ }
+
+
+ serverLocator = clusterConnector.createServerLocator(true);
+
if (serverLocator != null)
{
@@ -509,7 +628,7 @@
log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
}
- final TopologyMember currentMember = clusterManagerTopology.getMember(nodeUUID.toString());
+ final TopologyMember currentMember = topology.getMember(manager.getNodeId());
if (currentMember == null)
{
@@ -554,6 +673,7 @@
log.debug("sending notification: " + notification);
managementService.sendNotification(notification);
}
+
}
public TransportConfiguration getConnector()
@@ -660,7 +780,6 @@
{
log.debug(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
}
- log.info(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
// New node - create a new flow record
@@ -701,7 +820,26 @@
}
}
}
+
+ public synchronized void informTopology()
+ {
+ String nodeID = server.getNodeID().toString();
+
+ TopologyMember localMember;
+
+ if (backup)
+ {
+ localMember = new TopologyMember(null, connector);
+ }
+ else
+ {
+ localMember = new TopologyMember(connector, null);
+ }
+ topology.updateAsLive(nodeID, localMember);
+ }
+
+
private void createNewRecord(final long eventUID,
final String targetNodeID,
final TransportConfiguration connector,
@@ -709,7 +847,7 @@
final Queue queue,
final boolean start) throws Exception
{
- final ServerLocatorInternal targetLocator = new ServerLocatorImpl(clusterManagerTopology, false, connector);
+ final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology, false, connector);
String nodeId;
@@ -1365,7 +1503,8 @@
@Override
public String toString()
{
- return "ClusterConnectionImpl [nodeUUID=" + nodeUUID +
+ return "ClusterConnectionImpl@" + System.identityHashCode(this) +
+ "[nodeUUID=" + nodeUUID +
", connector=" +
connector +
", address=" +
@@ -1395,7 +1534,7 @@
interface ClusterConnector
{
- ServerLocatorInternal createServerLocator();
+ ServerLocatorInternal createServerLocator(boolean includeTopology);
}
private class StaticClusterConnector implements ClusterConnector
@@ -1407,7 +1546,7 @@
this.tcConfigs = tcConfigs;
}
- public ServerLocatorInternal createServerLocator()
+ public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
if (tcConfigs != null && tcConfigs.length > 0)
{
@@ -1415,7 +1554,9 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
}
- return new ServerLocatorImpl(clusterManagerTopology, true, tcConfigs);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null, true, tcConfigs);
+ locator.setClusterConnection(true);
+ return locator;
}
else
{
@@ -1443,9 +1584,11 @@
this.dg = dg;
}
- public ServerLocatorInternal createServerLocator()
+ public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
- return new ServerLocatorImpl(clusterManagerTopology, true, dg);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null, true, dg);
+ return locator;
+
}
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -30,15 +30,10 @@
import java.util.concurrent.ScheduledFuture;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.client.impl.Topology;
-import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
@@ -46,7 +41,6 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
@@ -83,6 +77,8 @@
private final PostOffice postOffice;
private final ScheduledExecutorService scheduledExecutor;
+
+ private ClusterConnection defaultClusterConnection;
private final ManagementService managementService;
@@ -99,10 +95,6 @@
// the cluster connections which links this node to other cluster nodes
private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
- private final Topology topology = new Topology(this);
-
- private volatile ServerLocatorInternal backupServerLocator;
-
private final Set<ServerLocatorInternal> clusterLocators = new ConcurrentHashSet<ServerLocatorInternal>();
private final Executor executor;
@@ -126,8 +118,6 @@
executor = executorFactory.getExecutor();;
- topology.setExecutor(executor);
-
this.server = server;
this.postOffice = postOffice;
@@ -152,7 +142,6 @@
out.println("Information on " + this);
out.println("*******************************************************");
- out.println("Topology: " + topology.describe("Toopology on " + this));
for (ClusterConnection conn : this.clusterConnections.values())
{
@@ -163,29 +152,24 @@
return str.toString();
}
+
+ public ClusterConnection getDefaultConnection()
+ {
+ return defaultClusterConnection;
+ }
public String toString()
{
return "ClusterManagerImpl[server=" + server + "]@" + System.identityHashCode(this);
}
- public TopologyMember getLocalMember()
- {
- return topology.getMember(nodeUUID.toString());
- }
-
public String getNodeId()
{
return nodeUUID.toString();
}
- public synchronized void start() throws Exception
+ public synchronized void deploy() throws Exception
{
- if (started)
- {
- return;
- }
-
if (clustered)
{
for (BroadcastGroupConfiguration config : configuration.getBroadcastGroupConfigurations())
@@ -193,43 +177,44 @@
deployBroadcastGroup(config);
}
- String connectorName = null;
-
for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
{
- if (connectorName == null)
- {
- connectorName = config.getConnectorName();
- break;
- }
- }
+ deployClusterConnection(config);
+ }
+ }
+ }
- if (connectorName != null)
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ for (BroadcastGroup group: broadcastGroups.values())
+ {
+ if (!backup)
{
- TransportConfiguration nodeConnector = configuration.getConnectorConfigurations().get(connectorName);
- if (nodeConnector == null)
- {
- log.warn("No connecor with name '" + connectorName +
- "'. The cluster connection will not be deployed.");
- return;
- }
-
- // Now announce presence
- announceNode(nodeConnector);
-
- for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
- {
- deployClusterConnection(config);
- }
+ group.start();
}
-
}
+
+ for (ClusterConnection conn : clusterConnections.values())
+ {
+ conn.start();
+ if (backup)
+ {
+ conn.informTopology();
+ conn.announceBackup();
+ }
+ }
for (BridgeConfiguration config : configuration.getBridgeConfigurations())
{
- deployBridge(config);
+ deployBridge(config, !backup);
}
+
started = true;
}
@@ -267,12 +252,6 @@
}
bridges.clear();
-
- if (backupServerLocator != null)
- {
- backupServerLocator.close();
- backupServerLocator = null;
- }
}
for (ServerLocatorInternal clusterLocator : clusterLocators)
@@ -289,31 +268,9 @@
clusterLocators.clear();
started = false;
- clusterConnections.clear();
+ clearClusterConnections();
}
- public void nodeAnnounced(final long uniqueEventID,
- final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean backup)
- {
- if (log.isDebugEnabled())
- {
- log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
- }
-
- TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
- newMember.setUniqueEventID(uniqueEventID);
- if (backup)
- {
- topology.updateBackup(nodeID, new TopologyMember(connectorPair.a, connectorPair.b));
- }
- else
- {
- topology.updateMember(uniqueEventID, nodeID, newMember);
- }
- }
-
public void flushExecutor()
{
Future future = new Future();
@@ -350,24 +307,6 @@
return clusterConnections.get(name.toString());
}
- public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
- {
- topology.addClusterTopologyListener(listener);
-
- // no need to use an executor here since the Topology is already using one
- topology.sendTopology(listener);
- }
-
- public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
- {
- topology.removeClusterTopologyListener(listener);
- }
-
- public Topology getTopology()
- {
- return topology;
- }
-
// backup node becomes live
public synchronized void activate()
{
@@ -375,27 +314,6 @@
{
backup = false;
- String nodeID = server.getNodeID().toString();
-
- TopologyMember member = topology.getMember(nodeID);
- // swap backup as live and send it to everybody
- member = new TopologyMember(member.getConnector().b, null);
- topology.updateAsLive(nodeID, member);
-
- if (backupServerLocator != null)
- {
- // todo we could use the topology of this to preempt it arriving from the cc
- try
- {
- backupServerLocator.close();
- }
- catch (Exception e)
- {
- log.warn("problem closing backup session factory", e);
- }
- backupServerLocator = null;
- }
-
for (BroadcastGroup broadcastGroup : broadcastGroups.values())
{
try
@@ -432,31 +350,15 @@
log.warn("unable to start bridge " + bridge.getName(), e);
}
}
-
- topology.sendMember(nodeID);
}
}
public void announceBackup() throws Exception
{
- List<ClusterConnectionConfiguration> configs = this.configuration.getClusterConfigurations();
- if (!configs.isEmpty())
+ for (ClusterConnection conn : this.clusterConnections.values())
{
- ClusterConnectionConfiguration config = configs.get(0);
-
- TransportConfiguration connector = configuration.getConnectorConfigurations().get(config.getConnectorName());
-
- if (connector == null)
- {
- log.warn("No connecor with name '" + config.getConnectorName() + "'. backup cannot be announced.");
- return;
- }
- announceBackup(config, connector);
+ conn.announceBackup();
}
- else
- {
- log.warn("no cluster connections defined, unable to announce backup");
- }
}
public void addClusterLocator(final ServerLocatorInternal serverLocator)
@@ -468,114 +370,9 @@
{
this.clusterLocators.remove(serverLocator);
}
-
- private synchronized void announceNode(final TransportConfiguration nodeConnector)
+
+ public synchronized void deployBridge(final BridgeConfiguration config, final boolean start) throws Exception
{
- String nodeID = server.getNodeID().toString();
-
- TopologyMember localMember;
- if (backup)
- {
- localMember = new TopologyMember(null, nodeConnector);
- }
- else
- {
- localMember = new TopologyMember(nodeConnector, null);
- }
-
- topology.updateAsLive(nodeID, localMember);
- }
-
- private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
- {
- if (broadcastGroups.containsKey(config.getName()))
- {
- ClusterManagerImpl.log.warn("There is already a broadcast-group with name " + config.getName() +
- " deployed. This one will not be deployed.");
-
- return;
- }
-
- InetAddress localAddress = null;
- if (config.getLocalBindAddress() != null)
- {
- localAddress = InetAddress.getByName(config.getLocalBindAddress());
- }
-
- InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
-
- BroadcastGroupImpl group = new BroadcastGroupImpl(nodeUUID.toString(),
- config.getName(),
- localAddress,
- config.getLocalBindPort(),
- groupAddress,
- config.getGroupPort(),
- !backup);
-
- for (String connectorInfo : config.getConnectorInfos())
- {
- TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorInfo);
-
- if (connector == null)
- {
- logWarnNoConnector(config.getName(), connectorInfo);
-
- return;
- }
-
- group.addConnector(connector);
- }
-
- ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(group,
- 0L,
- config.getBroadcastPeriod(),
- MILLISECONDS);
-
- group.setScheduledFuture(future);
-
- broadcastGroups.put(config.getName(), group);
-
- managementService.registerBroadcastGroup(group, config);
-
- if (!backup)
- {
- group.start();
- }
- }
-
- private void logWarnNoConnector(final String connectorName, final String bgName)
- {
- ClusterManagerImpl.log.warn("There is no connector deployed with name '" + connectorName +
- "'. The broadcast group with name '" +
- bgName +
- "' will not be deployed.");
- }
-
- private TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames)
- {
- TransportConfiguration[] tcConfigs = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
- connectorNames.size());
- int count = 0;
- for (String connectorName : connectorNames)
- {
- TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorName);
-
- if (connector == null)
- {
- ClusterManagerImpl.log.warn("No connector defined with name '" + connectorName +
- "'. The bridge will not be deployed.");
-
- return null;
- }
-
- tcConfigs[count++] = connector;
- }
-
- return tcConfigs;
- }
-
- public synchronized void deployBridge(final BridgeConfiguration config) throws Exception
- {
if (config.getName() == null)
{
ClusterManagerImpl.log.warn("Must specify a unique name for each bridge. This one will not be deployed.");
@@ -702,11 +499,12 @@
bridges.put(config.getName(), bridge);
managementService.registerBridge(bridge, config);
-
- if (!backup)
+
+ if (start)
{
bridge.start();
}
+
}
public void destroyBridge(final String name) throws Exception
@@ -726,11 +524,49 @@
bridge.flushExecutor();
}
- private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
+ // for testing
+ public void clear()
{
+ for (Bridge bridge : bridges.values())
+ {
+ try
+ {
+ bridge.stop();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ bridges.clear();
+ for (ClusterConnection clusterConnection : clusterConnections.values())
+ {
+ try
+ {
+ clusterConnection.stop();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ clearClusterConnections();
+ }
+
+ // Private methods ----------------------------------------------------------------------------------------------------
+
+
+ private void clearClusterConnections()
+ {
+ clusterConnections.clear();
+ this.defaultClusterConnection = null;
+ }
+
+ private void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
+ {
if (config.getName() == null)
{
- ClusterManagerImpl.log.warn("Must specify a unique name for each cluster. This one will not be deployed.");
+ ClusterManagerImpl.log.warn("Must specify a unique name for each cluster connection. This one will not be deployed.");
return;
}
@@ -781,7 +617,6 @@
}
clusterConnection = new ClusterConnectionImpl(this,
- topology,
dg,
connector,
new SimpleString(config.getName()),
@@ -819,7 +654,6 @@
}
clusterConnection = new ClusterConnectionImpl(this,
- topology,
tcConfigs,
connector,
new SimpleString(config.getName()),
@@ -847,6 +681,11 @@
config.isAllowDirectConnectionsOnly());
}
+ if (defaultClusterConnection == null)
+ {
+ defaultClusterConnection = clusterConnection;
+ }
+
managementService.registerCluster(clusterConnection, config);
clusterConnections.put(config.getName(), clusterConnection);
@@ -855,75 +694,8 @@
{
log.debug("ClusterConnection.start at " + clusterConnection, new Exception("trace"));
}
- clusterConnection.start();
-
- if (backup)
- {
- announceBackup(config, connector);
- }
}
-
- private void announceBackup(final ClusterConnectionConfiguration config, final TransportConfiguration connector) throws Exception
- {
- if (config.getStaticConnectors() != null)
- {
- TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
-
- backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
- backupServerLocator.setReconnectAttempts(-1);
- backupServerLocator.setInitialConnectAttempts(-1);
- }
- else if (config.getDiscoveryGroupName() != null)
- {
- DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
- .get(config.getDiscoveryGroupName());
-
- if (dg == null)
- {
- ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
- "'. The cluster connection will not be deployed.");
- }
-
- backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(dg);
- backupServerLocator.setReconnectAttempts(-1);
- backupServerLocator.setInitialConnectAttempts(-1);
- }
- else
- {
- return;
- }
- log.info("announcing backup");
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- if (log.isDebugEnabled())
- {
- log.debug(ClusterManagerImpl.this + ":: announcing " + connector + " to " + backupServerLocator);
- }
- ClientSessionFactory backupSessionFactory = backupServerLocator.connect();
- if (backupSessionFactory != null)
- {
- backupSessionFactory.getConnection()
- .getChannel(0, -1)
- .send(new NodeAnnounceMessage(System.currentTimeMillis(),
- nodeUUID.toString(),
- true,
- connector,
- null));
- log.info("backup announced");
- }
- }
- catch (Exception e)
- {
- log.warn("Unable to announce backup, retrying", e);
- }
- }
- });
- }
-
+
private Transformer instantiateTransformer(final String transformerClassName)
{
Transformer transformer = null;
@@ -945,32 +717,89 @@
return transformer;
}
- // for testing
- public void clear()
+
+ private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
{
- for (Bridge bridge : bridges.values())
+ if (broadcastGroups.containsKey(config.getName()))
{
- try
+ ClusterManagerImpl.log.warn("There is already a broadcast-group with name " + config.getName() +
+ " deployed. This one will not be deployed.");
+
+ return;
+ }
+
+ InetAddress localAddress = null;
+ if (config.getLocalBindAddress() != null)
+ {
+ localAddress = InetAddress.getByName(config.getLocalBindAddress());
+ }
+
+ InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
+
+ BroadcastGroupImpl group = new BroadcastGroupImpl(nodeUUID.toString(),
+ config.getName(),
+ localAddress,
+ config.getLocalBindPort(),
+ groupAddress,
+ config.getGroupPort(),
+ !backup);
+
+ for (String connectorInfo : config.getConnectorInfos())
+ {
+ TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorInfo);
+
+ if (connector == null)
{
- bridge.stop();
+ logWarnNoConnector(config.getName(), connectorInfo);
+
+ return;
}
- catch (Exception e)
- {
- log.warn(e.getMessage(), e);
- }
+
+ group.addConnector(connector);
}
- bridges.clear();
- for (ClusterConnection clusterConnection : clusterConnections.values())
+
+ ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(group,
+ 0L,
+ config.getBroadcastPeriod(),
+ MILLISECONDS);
+
+ group.setScheduledFuture(future);
+
+ broadcastGroups.put(config.getName(), group);
+
+ managementService.registerBroadcastGroup(group, config);
+ }
+
+ private void logWarnNoConnector(final String connectorName, final String bgName)
+ {
+ ClusterManagerImpl.log.warn("There is no connector deployed with name '" + connectorName +
+ "'. The broadcast group with name '" +
+ bgName +
+ "' will not be deployed.");
+ }
+
+ private TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames)
+ {
+ TransportConfiguration[] tcConfigs = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+ connectorNames.size());
+ int count = 0;
+ for (String connectorName : connectorNames)
{
- try
+ TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorName);
+
+ if (connector == null)
{
- clusterConnection.stop();
+ ClusterManagerImpl.log.warn("No connector defined with name '" + connectorName +
+ "'. The bridge will not be deployed.");
+
+ return null;
}
- catch (Exception e)
- {
- e.printStackTrace();
- }
+
+ tcConfigs[count++] = connector;
}
- clusterConnections.clear();
+
+ return tcConfigs;
}
+
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -14,7 +14,6 @@
package org.hornetq.core.server.cluster.impl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.server.cluster.ClusterManager;
/**
@@ -30,8 +29,6 @@
void removeClusterLocator(ServerLocatorInternal locator);
- TopologyMember getLocalMember();
-
String getNodeId();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -227,7 +227,12 @@
// Used to identify the server on tests... useful on debugging testcases
private String identity;
+
+ private Thread backupActivationThread;
+ private Activation activation;
+
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -289,11 +294,6 @@
// lifecycle methods
// ----------------------------------------------------------------
- private interface Activation extends Runnable
- {
- void close(boolean permanently) throws Exception;
- }
-
/*
* Can be overridden for tests
*/
@@ -309,259 +309,6 @@
}
}
- private class NoSharedStoreLiveActivation implements Activation
- {
- public void run()
- {
- try
- {
- initialisePart1();
-
- initialisePart2();
-
- if (identity != null)
- {
- log.info("Server " + identity + " is now live");
- }
- else
- {
- log.info("Server is now live");
- }
- }
- catch (Exception e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
-
- }
- }
-
- private class SharedStoreLiveActivation implements Activation
- {
- public void run()
- {
- try
- {
- log.info("Waiting to obtain live lock");
-
- checkJournalDirectory();
-
- initialisePart1();
-
- if(nodeManager.isBackupLive())
- {
- //looks like we've failed over at some point need to inform that we are the backup so when the current live
- // goes down they failover to us
- clusterManager.announceBackup();
- Thread.sleep(configuration.getFailbackDelay());
- }
-
- nodeManager.startLiveNode();
-
- if (stopped)
- {
- return;
- }
-
- initialisePart2();
-
- log.info("Server is now live");
- }
- catch (Exception e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
- if(permanently)
- {
- nodeManager.crashLiveServer();
- }
- else
- {
- nodeManager.pauseLiveServer();
- }
- }
- }
-
-
- private class SharedStoreBackupActivation implements Activation
- {
-
- volatile boolean closed = false;
- public void run()
- {
- try
- {
- nodeManager.startBackup();
-
- initialisePart1();
-
- clusterManager.start();
-
- started = true;
-
- log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started, waiting live to fail before it gets active");
-
- nodeManager.awaitLiveNode();
-
- configuration.setBackup(false);
-
- if (stopped)
- {
- return;
- }
-
- initialisePart2();
-
- clusterManager.activate();
-
- log.info("Backup Server is now live");
-
- nodeManager.releaseBackup();
- if(configuration.isAllowAutoFailBack())
- {
- class FailbackChecker implements Runnable
- {
- boolean restarting = false;
- public void run()
- {
- try
- {
- if(!restarting && nodeManager.isAwaitingFailback())
- {
- log.info("live server wants to restart, restarting server in backup");
- restarting = true;
- Thread t = new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- log.debug(HornetQServerImpl.this + "::Stopping live node in favor of failback");
- stop(true);
- // We need to wait some time before we start the backup again
- // otherwise we may eventually start before the live had a chance to get it
- Thread.sleep(configuration.getFailbackDelay());
- configuration.setBackup(true);
- log.debug(HornetQServerImpl.this + "::Starting backup node now after failback");
- start();
- }
- catch (Exception e)
- {
- log.warn("unable to restart server, please kill and restart manually", e);
- }
- }
- });
- t.start();
- }
- }
- catch (Exception e)
- {
- log.debug(e.getMessage(), e);
- //hopefully it will work next call
- }
- }
- }
- scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l, TimeUnit.MILLISECONDS);
- }
- }
- catch (InterruptedException e)
- {
- //this is ok, we are being stopped
- }
- catch (ClosedChannelException e)
- {
- //this is ok too, we are being stopped
- }
- catch (Exception e)
- {
- if(!(e.getCause() instanceof InterruptedException))
- {
- log.error("Failure in initialisation", e);
- }
- }
- catch(Throwable e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
- if (configuration.isBackup())
- {
- long timeout = 30000;
-
- long start = System.currentTimeMillis();
-
- while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
- {
- nodeManager.interrupt();
-
- backupActivationThread.interrupt();
-
- backupActivationThread.join(1000);
-
- }
-
- if (System.currentTimeMillis() - start >= timeout)
- {
- threadDump("Timed out waiting for backup activation to exit");
- }
-
- nodeManager.stopBackup();
- }
- else
- {
- //if we are now live, behave as live
- // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
- // started before the live
- if(permanently)
- {
- nodeManager.crashLiveServer();
- }
- else
- {
- nodeManager.pauseLiveServer();
- }
- }
- }
- }
-
- private class SharedNothingBackupActivation implements Activation
- {
- public void run()
- {
- try
- {
- // TODO
-
- // Try-Connect to live server using live-connector-ref
-
- // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
- }
- catch (Exception e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
- }
- }
-
- private Thread backupActivationThread;
-
- private Activation activation;
-
public synchronized void start() throws Exception
{
stopped = false;
@@ -611,6 +358,7 @@
}
+ // The activation on fail-back may change the value of isBackup, for that reason we are not using else here
if (configuration.isBackup())
{
if (configuration.isSharedStore())
@@ -1069,7 +817,6 @@
return new HashSet<ServerSession>(sessions.values());
}
- // TODO - should this really be here?? It's only used in tests
public boolean isInitialised()
{
synchronized (initialiseLock)
@@ -1232,9 +979,145 @@
return connectorsService;
}
- // Public
- // ---------------------------------------------------------------------------------------
+
+ public synchronized boolean checkActivate() throws Exception
+ {
+ if (configuration.isBackup())
+ {
+ // Handle backup server activation
+ if (!configuration.isSharedStore())
+ {
+ if (replicationEndpoint == null)
+ {
+ HornetQServerImpl.log.warn("There is no replication endpoint, can't activate this backup server");
+
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Can't activate the server");
+ }
+
+ replicationEndpoint.stop();
+ }
+
+ // Complete the startup procedure
+
+ HornetQServerImpl.log.info("Activating backup server");
+
+ configuration.setBackup(false);
+
+ initialisePart2();
+ }
+
+ return true;
+ }
+
+ public void deployDivert(DivertConfiguration config) throws Exception
+ {
+ if (config.getName() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify a name for each divert. This one will not be deployed.");
+
+ return;
+ }
+
+ if (config.getAddress() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify an address for each divert. This one will not be deployed.");
+
+ return;
+ }
+
+ if (config.getForwardingAddress() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify an forwarding address for each divert. This one will not be deployed.");
+
+ return;
+ }
+
+ SimpleString sName = new SimpleString(config.getName());
+
+ if (postOffice.getBinding(sName) != null)
+ {
+ HornetQServerImpl.log.warn("Binding already exists with name " + sName + ", divert will not be deployed");
+
+ return;
+ }
+
+ SimpleString sAddress = new SimpleString(config.getAddress());
+
+ Transformer transformer = instantiateTransformer(config.getTransformerClassName());
+
+ Filter filter = FilterImpl.createFilter(config.getFilterString());
+
+ Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
+ sName,
+ new SimpleString(config.getRoutingName()),
+ config.isExclusive(),
+ filter,
+ transformer,
+ postOffice,
+ storageManager);
+
+ Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
+
+ postOffice.addBinding(binding);
+
+ managementService.registerDivert(divert, config);
+ }
+
+ public void destroyDivert(SimpleString name) throws Exception
+ {
+ Binding binding = postOffice.getBinding(name);
+ if (binding == null)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for divert " + name);
+ }
+ if (!(binding instanceof DivertBinding))
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding " + name + " is not a divert");
+ }
+
+ postOffice.removeBinding(name);
+ }
+
+
+
+ public void deployBridge(BridgeConfiguration config) throws Exception
+ {
+ if (clusterManager != null)
+ {
+ clusterManager.deployBridge(config, true);
+ }
+ }
+
+ public void destroyBridge(String name) throws Exception
+ {
+ if (clusterManager != null)
+ {
+ clusterManager.destroyBridge(name);
+ }
+ }
+
+ public ServerSession getSessionByID(String sessionName)
+ {
+ return sessions.get(sessionName);
+ }
+
+ // PUBLIC -------
+
+ public String toString()
+ {
+ if (identity != null)
+ {
+ return "HornetQServerImpl::" + identity;
+ }
+ else
+ {
+ return "HornetQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : "");
+ }
+ }
+
+
+
// Package protected
// ----------------------------------------------------------------------------
@@ -1296,34 +1179,6 @@
// Private
// --------------------------------------------------------------------------------------
- // private boolean startReplication() throws Exception
- // {
- // String backupConnectorName = configuration.getBackupConnectorName();
- //
- // if (!configuration.isSharedStore() && backupConnectorName != null)
- // {
- // TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
- //
- // if (backupConnector == null)
- // {
- // HornetQServerImpl.log.warn("connector with name '" + backupConnectorName +
- // "' is not defined in the configuration.");
- // }
- // else
- // {
- //
- // replicationFailoverManager = createBackupConnectionFailoverManager(backupConnector,
- // threadPool,
- // scheduledPool);
- //
- // replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
- // replicationManager.start();
- // }
- // }
- //
- // return true;
- // }
-
private void callActivateCallbacks()
{
for (ActivateCallback callback : activateCallbacks)
@@ -1340,44 +1195,6 @@
}
}
- public synchronized boolean checkActivate() throws Exception
- {
- if (configuration.isBackup())
- {
- // Handle backup server activation
-
- if (!configuration.isSharedStore())
- {
- if (replicationEndpoint == null)
- {
- HornetQServerImpl.log.warn("There is no replication endpoint, can't activate this backup server");
-
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Can't activate the server");
- }
-
- replicationEndpoint.stop();
- }
-
- // Complete the startup procedure
-
- HornetQServerImpl.log.info("Activating backup server");
-
- configuration.setBackup(false);
-
- initialisePart2();
- }
-
- return true;
- }
-
- private class FileActivateRunner implements Runnable
- {
- public void run()
- {
-
- }
- }
-
private void initialiseLogging()
{
LogDelegateFactory logDelegateFactory = (LogDelegateFactory)instantiateInstance(configuration.getLogDelegateFactoryClassName());
@@ -1414,8 +1231,6 @@
managementService = new ManagementServiceImpl(mbeanServer, configuration);
- remotingService = new RemotingServiceImpl(configuration, this, managementService, scheduledPool);
-
if (configuration.getMemoryMeasureInterval() != -1)
{
memoryManager = new MemoryManagerImpl(configuration.getMemoryWarningThreshold(),
@@ -1470,6 +1285,23 @@
configuration.isPersistIDCache(),
addressSettingsRepository);
+ // This can't be created until node id is set
+ clusterManager = new ClusterManagerImpl(executorFactory,
+ this,
+ postOffice,
+ scheduledPool,
+ managementService,
+ configuration,
+ nodeManager.getUUID(),
+ configuration.isBackup(),
+ configuration.isClustered());
+
+
+ clusterManager.deploy();
+
+
+ remotingService = new RemotingServiceImpl(clusterManager, configuration, this, managementService, scheduledPool);
+
messagingServerControl = managementService.registerServer(postOffice,
storageManager,
configuration,
@@ -1527,18 +1359,6 @@
deploySecurityFromConfiguration();
deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
-
- // This can't be created until node id is set
- clusterManager = new ClusterManagerImpl(executorFactory,
- this,
- postOffice,
- scheduledPool,
- managementService,
- configuration,
- nodeManager.getUUID(),
- configuration.isBackup(),
- configuration.isClustered());
-
}
/*
@@ -1604,10 +1424,10 @@
// We do this at the end - we don't want things like MDBs or other connections connecting to a backup server until
// it is activated
- remotingService.start();
-
clusterManager.start();
+ remotingService.start();
+
initialised = true;
}
@@ -1826,76 +1646,6 @@
}
}
- public void deployDivert(DivertConfiguration config) throws Exception
- {
- if (config.getName() == null)
- {
- HornetQServerImpl.log.warn("Must specify a name for each divert. This one will not be deployed.");
-
- return;
- }
-
- if (config.getAddress() == null)
- {
- HornetQServerImpl.log.warn("Must specify an address for each divert. This one will not be deployed.");
-
- return;
- }
-
- if (config.getForwardingAddress() == null)
- {
- HornetQServerImpl.log.warn("Must specify an forwarding address for each divert. This one will not be deployed.");
-
- return;
- }
-
- SimpleString sName = new SimpleString(config.getName());
-
- if (postOffice.getBinding(sName) != null)
- {
- HornetQServerImpl.log.warn("Binding already exists with name " + sName + ", divert will not be deployed");
-
- return;
- }
-
- SimpleString sAddress = new SimpleString(config.getAddress());
-
- Transformer transformer = instantiateTransformer(config.getTransformerClassName());
-
- Filter filter = FilterImpl.createFilter(config.getFilterString());
-
- Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
- sName,
- new SimpleString(config.getRoutingName()),
- config.isExclusive(),
- filter,
- transformer,
- postOffice,
- storageManager);
-
- Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
-
- postOffice.addBinding(binding);
-
- managementService.registerDivert(divert, config);
- }
-
- public void destroyDivert(SimpleString name) throws Exception
- {
- Binding binding = postOffice.getBinding(name);
- if (binding == null)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for divert " + name);
- }
- if (!(binding instanceof DivertBinding))
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding " + name + " is not a divert");
- }
-
- postOffice.removeBinding(name);
- }
-
-
private synchronized void deployGroupingHandlerConfiguration(final GroupingHandlerConfiguration config) throws Exception
{
if (config != null)
@@ -1922,22 +1672,6 @@
managementService.addNotificationListener(groupingHandler);
}
}
-
- public void deployBridge(BridgeConfiguration config) throws Exception
- {
- if (clusterManager != null)
- {
- clusterManager.deployBridge(config);
- }
- }
-
- public void destroyBridge(String name) throws Exception
- {
- if (clusterManager != null)
- {
- clusterManager.destroyBridge(name);
- }
- }
private Transformer instantiateTransformer(final String transformerClassName)
{
@@ -1979,11 +1713,6 @@
}
- public ServerSession getSessionByID(String sessionName)
- {
- return sessions.get(sessionName);
- }
-
/**
* Check if journal directory exists or create it (if configured to do so)
*/
@@ -2005,18 +1734,284 @@
}
}
- public String toString()
+ /**
+ * To be called by backup trying to fail back the server
+ */
+ private void startFailbackChecker()
{
- if (identity != null)
+ scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l, TimeUnit.MILLISECONDS);
+ }
+
+
+ // Inner classes
+ // --------------------------------------------------------------------------------
+
+ class FailbackChecker implements Runnable
+ {
+ boolean restarting = false;
+ public void run()
{
- return "HornetQServerImpl::" + identity;
+ try
+ {
+ if(!restarting && nodeManager.isAwaitingFailback())
+ {
+ log.info("live server wants to restart, restarting server in backup");
+ restarting = true;
+ Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ log.debug(HornetQServerImpl.this + "::Stopping live node in favor of failback");
+ stop(true);
+ // We need to wait some time before we start the backup again
+ // otherwise we may eventually start before the live had a chance to get it
+ Thread.sleep(configuration.getFailbackDelay());
+ configuration.setBackup(true);
+ log.debug(HornetQServerImpl.this + "::Starting backup node now after failback");
+ start();
+ }
+ catch (Exception e)
+ {
+ log.warn("unable to restart server, please kill and restart manually", e);
+ }
+ }
+ });
+ t.start();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
- else
+ }
+
+
+
+ private class SharedStoreLiveActivation implements Activation
+ {
+ public void run()
{
- return "HornetQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : "");
+ try
+ {
+ log.info("Waiting to obtain live lock");
+
+ checkJournalDirectory();
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("First part initialization on " + this);
+ }
+
+ initialisePart1();
+
+ if(nodeManager.isBackupLive())
+ {
+ //looks like we've failed over at some point need to inform that we are the backup so when the current live
+ // goes down they failover to us
+ if (log.isDebugEnabled())
+ {
+ log.debug("announcing backup to the former live" + this);
+ }
+
+ clusterManager.announceBackup();
+ Thread.sleep(configuration.getFailbackDelay());
+ }
+
+ nodeManager.startLiveNode();
+
+ if (stopped)
+ {
+ return;
+ }
+
+ initialisePart2();
+
+ log.info("Server is now live");
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in initialisation", e);
+ }
}
+
+ public void close(boolean permanently) throws Exception
+ {
+ if(permanently)
+ {
+ nodeManager.crashLiveServer();
+ }
+ else
+ {
+ nodeManager.pauseLiveServer();
+ }
+ }
}
- // Inner classes
- // --------------------------------------------------------------------------------
+
+ private class SharedStoreBackupActivation implements Activation
+ {
+ public void run()
+ {
+ try
+ {
+ nodeManager.startBackup();
+
+ initialisePart1();
+
+ clusterManager.start();
+
+ started = true;
+
+ log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "] started, waiting live to fail before it gets active");
+
+ nodeManager.awaitLiveNode();
+
+ configuration.setBackup(false);
+
+ if (stopped)
+ {
+ return;
+ }
+
+ initialisePart2();
+
+ clusterManager.activate();
+
+ log.info("Backup Server is now live");
+
+ nodeManager.releaseBackup();
+ if(configuration.isAllowAutoFailBack())
+ {
+ startFailbackChecker();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ //this is ok, we are being stopped
+ }
+ catch (ClosedChannelException e)
+ {
+ //this is ok too, we are being stopped
+ }
+ catch (Exception e)
+ {
+ if(!(e.getCause() instanceof InterruptedException))
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+ catch(Throwable e)
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+
+ /**
+ *
+ */
+ public void close(boolean permanently) throws Exception
+ {
+ if (configuration.isBackup())
+ {
+ long timeout = 30000;
+
+ long start = System.currentTimeMillis();
+
+ while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
+ {
+ nodeManager.interrupt();
+
+ backupActivationThread.interrupt();
+
+ backupActivationThread.join(1000);
+
+ }
+
+ if (System.currentTimeMillis() - start >= timeout)
+ {
+ threadDump("Timed out waiting for backup activation to exit");
+ }
+
+ nodeManager.stopBackup();
+ }
+ else
+ {
+ //if we are now live, behave as live
+ // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
+ // started before the live
+ if(permanently)
+ {
+ nodeManager.crashLiveServer();
+ }
+ else
+ {
+ nodeManager.pauseLiveServer();
+ }
+ }
+ }
+ }
+
+ private interface Activation extends Runnable
+ {
+ void close(boolean permanently) throws Exception;
+ }
+
+ private class SharedNothingBackupActivation implements Activation
+ {
+ public void run()
+ {
+ try
+ {
+ // TODO
+
+ // Try-Connect to live server using live-connector-ref
+
+ // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+
+ public void close(boolean permanently) throws Exception
+ {
+ }
+ }
+
+ private class NoSharedStoreLiveActivation implements Activation
+ {
+ public void run()
+ {
+ try
+ {
+ initialisePart1();
+
+ initialisePart2();
+
+ if (identity != null)
+ {
+ log.info("Server " + identity + " is now live");
+ }
+ else
+ {
+ log.info("Server is now live");
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+
+ public void close(boolean permanently) throws Exception
+ {
+
+ }
+ }
+
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -1062,7 +1062,7 @@
return;
}
-
+
consumer.receiveCredits(credits);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -14,6 +14,7 @@
package org.hornetq.spi.core.protocol;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.Connection;
@@ -26,7 +27,7 @@
*/
public interface ProtocolManager extends BufferDecoder
{
- ConnectionEntry createConnectionEntry(Connection connection);
+ ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection);
public void removeHandler(final String name);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -14,6 +14,7 @@
package org.hornetq.spi.core.remoting;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.NotificationService;
/**
@@ -31,6 +32,11 @@
void pause();
/**
+ * @return the cluster connection associated with this Acceptor
+ */
+ ClusterConnection getClusterConnection();
+
+ /**
* Set the notification service for this acceptor to use.
*
* @param notificationService the notification service
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -18,6 +18,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.core.server.cluster.ClusterConnection;
+
/**
* A factory for creating acceptors.
* <p/>
@@ -40,7 +42,8 @@
* @param scheduledThreadPool a scheduled thread pool
* @return an acceptor
*/
- Acceptor createAcceptor(final Map<String, Object> configuration,
+ Acceptor createAcceptor(ClusterConnection clusterConnection,
+ final Map<String, Object> configuration,
BufferHandler handler,
BufferDecoder decoder,
ConnectionLifeCycleListener listener,
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -23,11 +23,13 @@
public interface ConnectionLifeCycleListener
{
/**
- * called when a connection is created.
+ * This method is used both by client connector creation and server connection creation through acceptors.
+ * the acceptor will be set to null on client operations
*
+ * @param The acceptor here will be always null on a client connection created event.
* @param connection the connection that has been created
*/
- void connectionCreated(Connection connection, ProtocolType protocol);
+ void connectionCreated(Acceptor acceptor, Connection connection, ProtocolType protocol);
/**
* called when a connection is destroyed.
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -940,10 +940,10 @@
for (ClusterConnection cc : clusterManager.getClusterConnections())
{
out += cc.describe() + "\n";
+ out += cc.getTopology().describe();
}
}
out += "\n\nfull topology:";
- out += clusterManager.getTopology().describe();
return out + br;
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -131,10 +131,6 @@
waitForTopology(servers[0], 2);
waitForTopology(servers[1], 2);
-
- System.out.println(servers[0].getClusterManager().getTopology().describe());
-
- System.out.println(servers[1].getClusterManager().getTopology().describe());
setupSessionFactory(0, isNetty(), true);
setupSessionFactory(1, isNetty(), true);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -153,17 +153,6 @@
waitForTopology(servers[1], 3);
waitForTopology(servers[2], 3);
- for (int i = 0 ; i < 3; i++)
- {
- System.out.println("top[" + i + "]=" + servers[i].getClusterManager().getTopology().describe());
- }
-
- for (int i = 0; i <= 2; i++)
- {
- log.info("*************************************\n " + servers[i] +
- " topology:\n" +
- servers[i].getClusterManager().getTopology().describe());
- }
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -196,12 +185,6 @@
startServers(0, 1);
- for (int i = 0; i <= 1; i++)
- {
- log.info("*************************************\n " + servers[i] +
- " topology:\n" +
- servers[i].getClusterManager().getTopology().describe());
- }
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -266,13 +249,6 @@
for (int i = 0; i <= 4; i++)
{
- log.info("*************************************\n " + servers[i] +
- " topology:\n" +
- servers[i].getClusterManager().getTopology().describe());
- }
-
- for (int i = 0; i <= 4; i++)
- {
setupSessionFactory(i, isNetty());
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -137,8 +137,6 @@
startServers(0, 1);
waitForTopology(servers[0], 2);
- System.out.println(servers[0].getClusterManager().getTopology().describe());
- System.out.println(servers[1].getClusterManager().getTopology().describe());
waitForTopology(servers[1], 2);
for (int i = 0; i < 10; i++)
@@ -148,7 +146,6 @@
log.info("#stop #test #" + i);
stopServers(1);
- System.out.println(servers[0].getClusterManager().getTopology().describe());
waitForTopology(servers[0], 1, 2000);
log.info("#start #test #" + i);
startServers(1);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -76,18 +77,28 @@
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
+ ((ServerLocatorInternal)locator).setIdentity("testAutoFailback");
+
ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
final CountDownLatch latch = new CountDownLatch(1);
ClientSession session = sendAndConsume(sf, true);
+
+ System.out.println(locator.getTopology().describe());
MyListener listener = new MyListener(latch);
session.addFailureListener(listener);
+
+ System.out.println(locator.getTopology().describe());
liveServer.crash();
-
+
assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ log.info("backup (nowLive) topology = " + backupServer.getServer().getClusterManager().getDefaultConnection().getTopology().describe());
+
+ log.info("Server Crash!!!");
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -97,6 +108,11 @@
producer.send(message);
+ verifyMessageOnServer(1, 1);
+
+ System.out.println(locator.getTopology().describe());
+
+
session.removeFailureListener(listener);
final CountDownLatch latch2 = new CountDownLatch(1);
@@ -107,6 +123,10 @@
log.info("******* starting live server back");
liveServer.start();
+
+ Thread.sleep(1000);
+
+ System.out.println("After failback: " + locator.getTopology().describe());
assertTrue(latch2.await(5, TimeUnit.SECONDS));
@@ -118,6 +138,8 @@
session.close();
+ verifyMessageOnServer(0, 1);
+
sf.close();
Assert.assertEquals(0, sf.numSessions());
@@ -125,6 +147,29 @@
Assert.assertEquals(0, sf.numConnections());
}
+ /**
+ * @throws Exception
+ * @throws HornetQException
+ */
+ private void verifyMessageOnServer(final int server, final int numberOfMessages) throws Exception, HornetQException
+ {
+ ServerLocator backupLocator = createInVMLocator(server);
+ ClientSessionFactory factorybkp = backupLocator.createSessionFactory();
+ ClientSession sessionbkp = factorybkp.createSession(false, false);
+ sessionbkp.start();
+ ClientConsumer consumerbkp = sessionbkp.createConsumer(ADDRESS);
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumerbkp.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ sessionbkp.commit();
+ }
+ sessionbkp.close();
+ factorybkp.close();
+ backupLocator.close();
+ }
+
public void testAutoFailbackThenFailover() throws Exception
{
locator.setBlockOnNonDurableSend(true);
@@ -253,7 +298,7 @@
if (createQueue)
{
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
}
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -288,6 +333,8 @@
}
ClientMessage message3 = consumer.receiveImmediate();
+
+ consumer.close();
Assert.assertNull(message3);
@@ -315,6 +362,7 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
+ System.out.println("Failed, me");
latch.countDown();
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -12,16 +12,9 @@
*/
package org.hornetq.tests.integration.cluster.failover;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
/**
@@ -167,6 +160,7 @@
closeSessionFactory(0);
Thread.sleep(1000);
+
servers[0].stop(true);
waitForServerRestart(2);
@@ -213,16 +207,4 @@
abstract boolean isSharedServer();
- private void fail(final RemotingConnection conn, final CountDownLatch latch) throws InterruptedException
- {
- // Simulate failure on connection
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
- // Wait to be informed of failure
-
- boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
-
- Assert.assertTrue(ok);
- }
-
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -193,7 +193,7 @@
{
if (server != null)
{
- log.info("failed topology, Topology on server = " + server.getClusterManager().getTopology().describe());
+ log.info("failed topology, Topology on server = " + server.getClusterManager().describe());
}
}
assertTrue("expected " + topologyMembers + " members", ok);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -98,8 +98,6 @@
Thread.sleep(500);
servers.get(0).crash(session);
- System.out.println("server3 " + servers.get(3).getServer().getClusterManager().getTopology().describe());
-
int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);
ServerLocator locator2 = getServerLocator(3);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -29,6 +29,7 @@
import org.hornetq.core.remoting.impl.netty.NettyConnector;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -544,7 +545,7 @@
latch = connCreatedLatch;
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
this.connection = connection;
if (latch != null)
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -89,7 +89,7 @@
*/
public MockConnection(final int serverID, final BufferHandler handler, final ConnectionLifeCycleListener listener)
{
- super(serverID, handler, listener, Executors.newSingleThreadExecutor());
+ super(null, serverID, handler, listener, Executors.newSingleThreadExecutor());
}
@Override
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -63,7 +63,7 @@
{
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
}
@@ -74,7 +74,8 @@
};
- Acceptor acceptor = factory.createAcceptor(params,
+ Acceptor acceptor = factory.createAcceptor(null,
+ params,
handler,
null,
listener,
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -28,6 +28,7 @@
import org.hornetq.core.remoting.impl.netty.NettyAcceptor;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -80,7 +81,7 @@
{
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -24,6 +24,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.remoting.impl.netty.NettyConnection;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.tests.util.RandomUtil;
@@ -220,7 +221,7 @@
class MyListener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -23,6 +23,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.remoting.impl.netty.NettyConnector;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -66,7 +67,7 @@
{
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
}
public void connectionReadyForWrites(Object connectionID, boolean ready)
@@ -106,7 +107,7 @@
{
}
- public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection, final ProtocolType protocol)
{
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-19 23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-20 00:50:32 UTC (rev 11371)
@@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.management.MBeanServer;
@@ -37,11 +38,13 @@
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -112,8 +115,15 @@
log.debug("waiting for " + nodes + " on the topology for server = " + server);
long start = System.currentTimeMillis();
+
+ Set<ClusterConnection> ccs = server.getClusterManager().getClusterConnections();
+
+ if (ccs.size() != 1)
+ {
+ throw new IllegalStateException("You need a single cluster connection on this version of waitForTopology on ServiceTestBase");
+ }
- Topology topology = server.getClusterManager().getTopology();
+ Topology topology = ccs.iterator().next().getTopology();
do
{
@@ -521,7 +531,19 @@
locators.add(locatorWithoutHA);
return locatorWithoutHA;
}
+
+ protected ServerLocator createInVMLocator(final int serverID)
+ {
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ if (serverID != 0)
+ {
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, serverID);
+ }
+
+ return HornetQClient.createServerLocatorWithHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params));
+ }
+
protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws Exception
{
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(connectorClass));
13 years, 3 months