Author: clebert.suconic(a)jboss.com
Date: 2011-09-20 19:57:01 -0400 (Tue, 20 Sep 2011)
New Revision: 11379
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/HornetQException.java
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java
branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
JBPAPP-7230 & JBPAPP-7229 - ClientID fixes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/HornetQException.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/HornetQException.java 2011-09-20
16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/HornetQException.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -135,6 +135,12 @@
public static final int DUPLICATE_ID_REJECTED = 113;
+ /**
+ * A Session Metadata was set in duplication
+ */
+ public static final int DUPLICATE_METADATA = 114;
+
+
// Native Error codes ----------------------------------------------
/**
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/client/ClientSession.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -569,6 +569,12 @@
void addMetaData(String key, String data) throws HornetQException;
/**
+ * Attach any metadata to the session. Throws an exception if there's already a
metadata available
+ * @throws HornetQException
+ */
+ void addUniqueMetaData(String key, String data) throws HornetQException;
+
+ /**
* Attach any metadata to the session.
* Sends a Metadata using the older version
* @deprecated Use {@link ClientSession#addMetaData(String, String)}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -65,6 +65,7 @@
import
org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -1146,6 +1147,11 @@
}
channel.sendBlocking(new SessionAddMetaDataMessageV2(key, data));
}
+
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ channel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data));
+ }
public ClientSessionFactoryInternal getSessionFactory()
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -593,4 +593,13 @@
return session.getChannel();
}
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#addUniqueMetaData(java.lang.String,
java.lang.String)
+ */
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ session.addUniqueMetaData(key, data);
+
+ }
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -79,6 +79,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -490,6 +491,23 @@
session.addMetaData(message.getKey(), message.getData());
break;
}
+ case PacketImpl.SESS_UNIQUE_ADD_METADATA:
+ {
+ SessionUniqueAddMetaDataMessage message =
(SessionUniqueAddMetaDataMessage)packet;
+ if (session.addUniqueMetaData(message.getKey(), message.getData()))
+ {
+ response = new NullResponseMessage();
+ }
+ else
+ {
+ response = new HornetQExceptionMessage(new
HornetQException(HornetQException.DUPLICATE_METADATA,
+ "Metadata " +
message.getKey() +
+ "=" +
+ message.getData() +
+ " had been set
already"));
+ }
+ break;
+ }
}
}
catch (HornetQXAException e)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -44,6 +44,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_UNIQUE_ADD_METADATA;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
@@ -141,6 +142,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -538,6 +540,11 @@
packet = new SessionAddMetaDataMessageV2();
break;
}
+ case SESS_UNIQUE_ADD_METADATA:
+ {
+ packet = new SessionUniqueAddMetaDataMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -181,13 +181,17 @@
public static final byte REPLICATION_COMPARE_DATA = 102;
public static final byte REPLICATION_SYNC = 103;
-
- // HA
public static final byte SESS_ADD_METADATA = 104;
public static final byte SESS_ADD_METADATA2 = 105;
+ public static final byte SESS_UNIQUE_ADD_METADATA = 106;
+
+
+
+ // HA
+
public static final byte CLUSTER_TOPOLOGY = 110;
public static final byte NODE_ANNOUNCE = 111;
@@ -200,6 +204,7 @@
public static final byte CLUSTER_TOPOLOGY_V2 = 114;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -30,7 +30,7 @@
private String key;
private String data;
/**
- * It won require confirmation during failover / reconnect
+ * It's not required confirmation during failover / reconnect
*/
private boolean requiresConfirmation = true;
@@ -39,6 +39,11 @@
super(PacketImpl.SESS_ADD_METADATA2);
}
+ protected SessionAddMetaDataMessageV2(byte packetCode)
+ {
+ super(packetCode);
+ }
+
public SessionAddMetaDataMessageV2(String k, String d)
{
this();
@@ -46,6 +51,13 @@
data = d;
}
+ protected SessionAddMetaDataMessageV2(final byte packetCode, String k, String d)
+ {
+ super(packetCode);
+ key = k;
+ data = d;
+ }
+
public SessionAddMetaDataMessageV2(String k, String d, boolean requiresConfirmation)
{
this();
Added:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
(rev 0)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+
+/**
+ * A SessionUniqueAddMetaDataMessageV2
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class SessionUniqueAddMetaDataMessage extends SessionAddMetaDataMessageV2
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionUniqueAddMetaDataMessage()
+ {
+ super(SESS_UNIQUE_ADD_METADATA);
+ }
+
+
+ public SessionUniqueAddMetaDataMessage(String key, String data)
+ {
+ super(SESS_UNIQUE_ADD_METADATA, key, data);
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -65,7 +65,7 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(RemotingServiceImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
public static final long CONNECTION_TTL_CHECK_INTERVAL = 2000;
@@ -95,7 +95,7 @@
private final ScheduledExecutorService scheduledThreadPool;
private FailureCheckAndFlushThread failureCheckAndFlushThread;
-
+
private final ClusterManager clusterManager;
private Map<ProtocolType, ProtocolManager> protocolMap = new
ConcurrentHashMap<ProtocolType, ProtocolManager>();
@@ -113,7 +113,7 @@
transportConfigs = config.getAcceptorConfigurations();
this.server = server;
-
+
this.clusterManager = clusterManager;
ClassLoader loader = Thread.currentThread().getContextClassLoader();
@@ -136,13 +136,13 @@
this.scheduledThreadPool = scheduledThreadPool;
- this.protocolMap.put(ProtocolType.CORE, new
CoreProtocolManagerFactory().createProtocolManager(server,
-
interceptors));
+ this.protocolMap.put(ProtocolType.CORE,
+ new CoreProtocolManagerFactory().createProtocolManager(server,
interceptors));
// difference between Stomp and Stomp over Web Sockets is handled in
NettyAcceptor.getPipeline()
- this.protocolMap.put(ProtocolType.STOMP, new
StompProtocolManagerFactory().createProtocolManager(server,
-
interceptors));
- this.protocolMap.put(ProtocolType.STOMP_WS, new
StompProtocolManagerFactory().createProtocolManager(server,
-
interceptors));
+ this.protocolMap.put(ProtocolType.STOMP,
+ new
StompProtocolManagerFactory().createProtocolManager(server, interceptors));
+ this.protocolMap.put(ProtocolType.STOMP_WS,
+ new
StompProtocolManagerFactory().createProtocolManager(server, interceptors));
}
// RemotingService implementation -------------------------------
@@ -168,9 +168,9 @@
// This needs to be a different thread pool to the main thread pool especially for
OIO where we may need
// to support many hundreds of connections, but the main thread pool must be kept
small for better performance
- ThreadFactory tFactory = new
HornetQThreadFactory("HornetQ-remoting-threads-" + server.toString() +
"-" + System.identityHashCode(this),
- false,
- tccl);
+ ThreadFactory tFactory = new
HornetQThreadFactory("HornetQ-remoting-threads-" + server.toString() +
+ "-" +
+ System.identityHashCode(this),
false, tccl);
threadPool = Executors.newCachedThreadPool(tFactory);
@@ -275,7 +275,6 @@
}
failureCheckAndFlushThread.close();
-
// We need to stop them accepting first so no new connections are accepted after we
send the disconnect message
for (Acceptor acceptor : acceptors)
@@ -297,7 +296,7 @@
for (ConnectionEntry entry : connections.values())
{
RemotingConnection conn = entry.connection;
-
+
if (log.isTraceEnabled())
{
log.trace("Sending connection.disconnection packet to " + conn);
@@ -398,23 +397,23 @@
{
log.trace("Connection created " + connection);
}
-
+
connections.put(connection.getID(), entry);
if (config.isBackup())
{
serverSideReplicatingConnection = entry.connection;
- }
+ }
}
-
+
public void connectionDestroyed(final Object connectionID)
{
- if (isTrace)
- {
- log.trace("Connection removed " + connectionID + " from server
" + this.server, new Exception ("trace"));
- }
-
+ if (isTrace)
+ {
+ log.trace("Connection removed " + connectionID + " from server
" + this.server, new Exception("trace"));
+ }
+
ConnectionEntry conn = connections.get(connectionID);
if (conn != null)
@@ -459,7 +458,7 @@
// Connections should only fail when TTL is exceeded
}
-
+
public void connectionReadyForWrites(final Object connectionID, final boolean ready)
{
}
@@ -496,10 +495,10 @@
}
else
{
- if (log.isTraceEnabled())
- {
- log.trace("ConnectionID = " + connectionID + " was already
closed, so ignoring packet");
- }
+ if (log.isTraceEnabled())
+ {
+ log.trace("ConnectionID = " + connectionID + " was already
closed, so ignoring packet");
+ }
}
}
}
@@ -540,76 +539,85 @@
{
while (!closed)
{
- long now = System.currentTimeMillis();
+ try
+ {
+ long now = System.currentTimeMillis();
- Set<Object> idsToRemove = new HashSet<Object>();
+ Set<Object> idsToRemove = new HashSet<Object>();
- for (ConnectionEntry entry : connections.values())
- {
- RemotingConnection conn = entry.connection;
+ for (ConnectionEntry entry : connections.values())
+ {
+ RemotingConnection conn = entry.connection;
- boolean flush = true;
+ boolean flush = true;
- if (entry.ttl != -1)
- {
- if (now >= entry.lastCheck + entry.ttl)
+ if (entry.ttl != -1)
{
- if (!conn.checkDataReceived())
+ if (now >= entry.lastCheck + entry.ttl)
{
- idsToRemove.add(conn.getID());
+ if (!conn.checkDataReceived())
+ {
+ idsToRemove.add(conn.getID());
- flush = false;
+ flush = false;
+ }
+ else
+ {
+ entry.lastCheck = now;
+ }
}
- else
- {
- entry.lastCheck = now;
- }
}
+
+ if (flush)
+ {
+ conn.flush();
+ }
}
- if (flush)
+ for (Object id : idsToRemove)
{
- conn.flush();
+ RemotingConnection conn = removeConnection(id);
+ if (conn != null)
+ {
+ HornetQException me = new
HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Did not receive
data from " + conn.getRemoteAddress() +
+ ". It is
likely the client has exited or crashed without " +
+ "closing
its connection, or the network between the server and client has failed. " +
+ "You also
might have configured connection-ttl and client-failure-check-period incorrectly. "
+
+ "Please
check user manual for more information." +
+ " The
connection will now be closed.");
+ conn.fail(me);
+ }
}
- }
- for (Object id : idsToRemove)
- {
- RemotingConnection conn = removeConnection(id);
+ synchronized (this)
+ {
+ long toWait = pauseInterval;
- HornetQException me = new
HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not receive data from
" + conn.getRemoteAddress() +
- ". It is likely
the client has exited or crashed without " +
- "closing its
connection, or the network between the server and client has failed. " +
- "You also might
have configured connection-ttl and client-failure-check-period incorrectly. " +
- "Please check
user manual for more information." +
- " The connection
will now be closed.");
- conn.fail(me);
- }
+ long start = System.currentTimeMillis();
- synchronized (this)
- {
- long toWait = pauseInterval;
-
- long start = System.currentTimeMillis();
-
- while (!closed && toWait > 0)
- {
- try
+ while (!closed && toWait > 0)
{
- wait(toWait);
- }
- catch (InterruptedException e)
- {
- }
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
- now = System.currentTimeMillis();
+ now = System.currentTimeMillis();
- toWait -= now - start;
+ toWait -= now - start;
- start = now;
+ start = now;
+ }
}
}
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-20
16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -132,6 +132,9 @@
List<ServerSession> getSessions(String connectionID);
+ /** will return true if there is any session wth this key */
+ boolean lookupSession(String metakey, String metavalue);
+
ClusterManager getClusterManager();
SimpleString getNodeID();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-09-20
16:40:18 UTC (rev 11378)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ServerSession.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -121,6 +121,8 @@
void addMetaData(String key, String data);
+ boolean addUniqueMetaData(String key, String data);
+
String getMetaData(String key);
String[] getTargetAddresses();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -797,6 +797,23 @@
sessions.remove(name);
}
+ public boolean lookupSession(String key, String value)
+ {
+ // getSessions is called here in a try to minimize locking the Server while this
check is being done
+ Set<ServerSession> allSessions = getSessions();
+
+ for (ServerSession session : allSessions)
+ {
+ String metaValue = session.getMetaData(key);
+ if (metaValue != null && metaValue.equals(value))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public synchronized List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries =
sessions.entrySet();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -1194,6 +1194,21 @@
metaData.put(key, data);
}
+
+ public boolean addUniqueMetaData(String key, String data)
+ {
+ if (server.lookupSession(key, data))
+ {
+ // There is a duplication of this property
+ return false;
+ }
+ else
+ {
+ addMetaData(key, data);
+ return true;
+ }
+ }
+
public String getMetaData(String key)
{
String data = null;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -181,6 +181,18 @@
{
throw new IllegalStateException("setClientID can only be called directly
after the connection is created");
}
+
+ try
+ {
+ initialSession.addUniqueMetaData("jms-client-id", clientID);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.DUPLICATE_METADATA)
+ {
+ throw new IllegalStateException("clientID=" + clientID + " was
already set into another connection");
+ }
+ }
this.clientID = clientID;
try
Modified:
branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -88,8 +88,21 @@
connection.setClientID(clientID);
ProxyAssertSupport.assertEquals(clientID, connection.getClientID());
+
+ Connection connection2 = JMSTest.cf.createConnection();
+ try
+ {
+ connection2.setClientID(clientID);
+ fail("setClientID was expected to throw an exception");
+ }
+ catch (JMSException e)
+ {
+ // expected
+ }
connection.close();
+
+ connection2.setClientID(clientID);
}
public void testSetClientAfterStart() throws Exception
Modified:
branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -1417,6 +1417,15 @@
// TODO Auto-generated method stub
}
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.api.core.client.ClientSession#addUniqueMetaData(java.lang.String,
java.lang.String)
+ */
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -1666,8 +1666,8 @@
null,
groupAddress,
port,
- 5000,
- 5000);
+ 1000,
+ 1000);
configuration.getDiscoveryGroupConfigurations().put(dcConfig.getName(), dcConfig);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-09-20
16:40:18 UTC (rev 11378)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-09-20
23:57:01 UTC (rev 11379)
@@ -420,7 +420,7 @@
locator.close();
}
-
+
public void testMultipleClientSessionFactories() throws Throwable
{
startServers(0, 1, 2, 3, 4);