Author: clebert.suconic(a)jboss.com
Date: 2011-09-21 13:56:19 -0400 (Wed, 21 Sep 2011)
New Revision: 11387
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/HornetQException.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/client/ClientSession.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/HornetQServer.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/ServerSession.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/jms/client/HornetQConnection.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
Log:
JBPAPP-7242 - Back porting JBPAPP-7230 & JBPAPP-7229 - ClientID fixes
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/HornetQException.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/HornetQException.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/HornetQException.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -135,6 +135,12 @@
public static final int DUPLICATE_ID_REJECTED = 113;
+ /**
+ * A Session Metadata was set in duplication
+ */
+ public static final int DUPLICATE_METADATA = 114;
+
+
// Native Error codes ----------------------------------------------
/**
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/client/ClientSession.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/client/ClientSession.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -569,6 +569,15 @@
void addMetaData(String key, String data) throws HornetQException;
/**
+ * Attach any metadata to the session. Throws an exception if there's already a
metadata available.
+ * You can use this metadata to ensure that there is no other session with the same
meta-data you are passing as an argument.
+ * This is useful to simulate unique client-ids, where you may want to avoid multiple
instances of your client application connected.
+ *
+ * @throws HornetQException
+ */
+ void addUniqueMetaData(String key, String data) throws HornetQException;
+
+ /**
* Attach any metadata to the session.
* Sends a Metadata using the older version
* @deprecated Use {@link ClientSession#addMetaData(String, String)}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -64,6 +64,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -1101,6 +1102,11 @@
metadata.put(key, data);
channel.sendBlocking(new SessionAddMetaDataMessageV2(key, data));
}
+
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ channel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data));
+ }
public ClientSessionFactoryInternal getSessionFactory()
{
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -578,4 +578,12 @@
{
return session.isCompressLargeMessages();
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#addUniqueMetaData(java.lang.String,
java.lang.String)
+ */
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ session.addUniqueMetaData(key, data);
+ }
}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -79,6 +79,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -490,6 +491,23 @@
session.addMetaData(message.getKey(), message.getData());
break;
}
+ case PacketImpl.SESS_UNIQUE_ADD_METADATA:
+ {
+ SessionUniqueAddMetaDataMessage message =
(SessionUniqueAddMetaDataMessage)packet;
+ if (session.addUniqueMetaData(message.getKey(), message.getData()))
+ {
+ response = new NullResponseMessage();
+ }
+ else
+ {
+ response = new HornetQExceptionMessage(new
HornetQException(HornetQException.DUPLICATE_METADATA,
+ "Metadata " +
message.getKey() +
+ "=" +
+ message.getData() +
+ " had been set
already"));
+ }
+ break;
+ }
}
}
catch (HornetQXAException e)
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -43,6 +43,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_UNIQUE_ADD_METADATA;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
@@ -138,6 +139,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -524,6 +526,11 @@
packet = new SessionAddMetaDataMessageV2();
break;
}
+ case SESS_UNIQUE_ADD_METADATA:
+ {
+ packet = new SessionUniqueAddMetaDataMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -181,19 +181,22 @@
public static final byte REPLICATION_COMPARE_DATA = 102;
public static final byte REPLICATION_SYNC = 103;
-
- // HA
public static final byte SESS_ADD_METADATA = 104;
public static final byte SESS_ADD_METADATA2 = 105;
+ public static final byte SESS_UNIQUE_ADD_METADATA = 106;
+
+ // HA
+
public static final byte CLUSTER_TOPOLOGY = 110;
public static final byte NODE_ANNOUNCE = 111;
public static final byte SUBSCRIBE_TOPOLOGY = 112;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -30,7 +30,7 @@
private String key;
private String data;
/**
- * It won require confirmation during failover / reconnect
+ * It's not required confirmation during failover / reconnect
*/
private boolean requiresConfirmation = true;
@@ -39,6 +39,11 @@
super(PacketImpl.SESS_ADD_METADATA2);
}
+ protected SessionAddMetaDataMessageV2(byte packetCode)
+ {
+ super(packetCode);
+ }
+
public SessionAddMetaDataMessageV2(String k, String d)
{
this();
@@ -46,6 +51,13 @@
data = d;
}
+ protected SessionAddMetaDataMessageV2(final byte packetCode, String k, String d)
+ {
+ super(packetCode);
+ key = k;
+ data = d;
+ }
+
public SessionAddMetaDataMessageV2(String k, String d, boolean requiresConfirmation)
{
this();
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
(rev 0)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+
+/**
+ * A SessionUniqueAddMetaDataMessageV2
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class SessionUniqueAddMetaDataMessage extends SessionAddMetaDataMessageV2
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionUniqueAddMetaDataMessage()
+ {
+ super(SESS_UNIQUE_ADD_METADATA);
+ }
+
+
+ public SessionUniqueAddMetaDataMessage(String key, String data)
+ {
+ super(SESS_UNIQUE_ADD_METADATA, key, data);
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -128,13 +128,13 @@
this.scheduledThreadPool = scheduledThreadPool;
- this.protocolMap.put(ProtocolType.CORE, new
CoreProtocolManagerFactory().createProtocolManager(server,
-
interceptors));
+ this.protocolMap.put(ProtocolType.CORE,
+ new CoreProtocolManagerFactory().createProtocolManager(server,
interceptors));
// difference between Stomp and Stomp over Web Sockets is handled in
NettyAcceptor.getPipeline()
- this.protocolMap.put(ProtocolType.STOMP, new
StompProtocolManagerFactory().createProtocolManager(server,
-
interceptors));
- this.protocolMap.put(ProtocolType.STOMP_WS, new
StompProtocolManagerFactory().createProtocolManager(server,
-
interceptors));
+ this.protocolMap.put(ProtocolType.STOMP,
+ new
StompProtocolManagerFactory().createProtocolManager(server, interceptors));
+ this.protocolMap.put(ProtocolType.STOMP_WS,
+ new
StompProtocolManagerFactory().createProtocolManager(server, interceptors));
}
// RemotingService implementation -------------------------------
@@ -374,9 +374,9 @@
if (config.isBackup())
{
serverSideReplicatingConnection = entry.connection;
- }
+ }
}
-
+
public void connectionDestroyed(final Object connectionID)
{
ConnectionEntry conn = connections.get(connectionID);
@@ -423,7 +423,7 @@
// Connections should only fail when TTL is exceeded
}
-
+
public void connectionReadyForWrites(final Object connectionID, final boolean ready)
{
}
@@ -497,76 +497,85 @@
{
while (!closed)
{
- long now = System.currentTimeMillis();
+ try
+ {
+ long now = System.currentTimeMillis();
- Set<Object> idsToRemove = new HashSet<Object>();
+ Set<Object> idsToRemove = new HashSet<Object>();
- for (ConnectionEntry entry : connections.values())
- {
- RemotingConnection conn = entry.connection;
+ for (ConnectionEntry entry : connections.values())
+ {
+ RemotingConnection conn = entry.connection;
- boolean flush = true;
+ boolean flush = true;
- if (entry.ttl != -1)
- {
- if (now >= entry.lastCheck + entry.ttl)
+ if (entry.ttl != -1)
{
- if (!conn.checkDataReceived())
+ if (now >= entry.lastCheck + entry.ttl)
{
- idsToRemove.add(conn.getID());
+ if (!conn.checkDataReceived())
+ {
+ idsToRemove.add(conn.getID());
- flush = false;
+ flush = false;
+ }
+ else
+ {
+ entry.lastCheck = now;
+ }
}
- else
- {
- entry.lastCheck = now;
- }
}
+
+ if (flush)
+ {
+ conn.flush();
+ }
}
- if (flush)
+ for (Object id : idsToRemove)
{
- conn.flush();
+ RemotingConnection conn = removeConnection(id);
+ if (conn != null)
+ {
+ HornetQException me = new
HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Did not receive
data from " + conn.getRemoteAddress() +
+ ". It is
likely the client has exited or crashed without " +
+ "closing
its connection, or the network between the server and client has failed. " +
+ "You also
might have configured connection-ttl and client-failure-check-period incorrectly. "
+
+ "Please
check user manual for more information." +
+ " The
connection will now be closed.");
+ conn.fail(me);
+ }
}
- }
- for (Object id : idsToRemove)
- {
- RemotingConnection conn = removeConnection(id);
+ synchronized (this)
+ {
+ long toWait = pauseInterval;
- HornetQException me = new
HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Did not receive data from
" + conn.getRemoteAddress() +
- ". It is likely
the client has exited or crashed without " +
- "closing its
connection, or the network between the server and client has failed. " +
- "You also might
have configured connection-ttl and client-failure-check-period incorrectly. " +
- "Please check
user manual for more information." +
- " The connection
will now be closed.");
- conn.fail(me);
- }
+ long start = System.currentTimeMillis();
- synchronized (this)
- {
- long toWait = pauseInterval;
-
- long start = System.currentTimeMillis();
-
- while (!closed && toWait > 0)
- {
- try
+ while (!closed && toWait > 0)
{
- wait(toWait);
- }
- catch (InterruptedException e)
- {
- }
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
- now = System.currentTimeMillis();
+ now = System.currentTimeMillis();
- toWait -= now - start;
+ toWait -= now - start;
- start = now;
+ start = now;
+ }
}
}
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
}
}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/HornetQServer.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -126,6 +126,9 @@
List<ServerSession> getSessions(String connectionID);
+ /** will return true if there is any session wth this key */
+ boolean lookupSession(String metakey, String metavalue);
+
ClusterManager getClusterManager();
SimpleString getNodeID();
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/ServerSession.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/ServerSession.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -124,6 +124,8 @@
void addMetaData(String key, String data);
+ boolean addUniqueMetaData(String key, String data);
+
String getMetaData(String key);
String[] getTargetAddresses();
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -966,6 +966,23 @@
sessions.remove(name);
}
+ public boolean lookupSession(String key, String value)
+ {
+ // getSessions is called here in a try to minimize locking the Server while this
check is being done
+ Set<ServerSession> allSessions = getSessions();
+
+ for (ServerSession session : allSessions)
+ {
+ String metaValue = session.getMetaData(key);
+ if (metaValue != null && metaValue.equals(value))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public synchronized List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries =
sessions.entrySet();
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -1173,6 +1173,21 @@
metaData.put(key, data);
}
+
+ public boolean addUniqueMetaData(String key, String data)
+ {
+ if (server.lookupSession(key, data))
+ {
+ // There is a duplication of this property
+ return false;
+ }
+ else
+ {
+ addMetaData(key, data);
+ return true;
+ }
+ }
+
public String getMetaData(String key)
{
String data = null;
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/jms/client/HornetQConnection.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/jms/client/HornetQConnection.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -181,6 +181,18 @@
{
throw new IllegalStateException("setClientID can only be called directly
after the connection is created");
}
+
+ try
+ {
+ initialSession.addUniqueMetaData("jms-client-id", clientID);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.DUPLICATE_METADATA)
+ {
+ throw new IllegalStateException("clientID=" + clientID + " was
already set into another connection");
+ }
+ }
this.clientID = clientID;
try
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -88,8 +88,21 @@
connection.setClientID(clientID);
ProxyAssertSupport.assertEquals(clientID, connection.getClientID());
+
+ Connection connection2 = JMSTest.cf.createConnection();
+ try
+ {
+ connection2.setClientID(clientID);
+ fail("setClientID was expected to throw an exception");
+ }
+ catch (JMSException e)
+ {
+ // expected
+ }
connection.close();
+
+ connection2.setClientID(clientID);
}
public void testSetClientAfterStart() throws Exception
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -1417,6 +1417,15 @@
// TODO Auto-generated method stub
}
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.api.core.client.ClientSession#addUniqueMetaData(java.lang.String,
java.lang.String)
+ */
+ public void addUniqueMetaData(String key, String data) throws HornetQException
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2011-09-21
17:06:02 UTC (rev 11386)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2011-09-21
17:56:19 UTC (rev 11387)
@@ -98,7 +98,7 @@
Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName +
"2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2",
subscriptionName + "2");
TopicControl topicControl = createManagementControl();
Assert.assertEquals(3, topicControl.getSubscriptionCount());
@@ -118,7 +118,7 @@
Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName +
"2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"_2",
subscriptionName + "2");
TopicControl topicControl = createManagementControl();
@@ -145,7 +145,7 @@
Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
TopicSubscriber subs1 = JMSUtil.createDurableSubscriber(connection_2, topic,
clientID, subscriptionName);
Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- TopicSubscriber subs2 = JMSUtil.createDurableSubscriber(connection_3, topic,
clientID, subscriptionName + "2");
+ TopicSubscriber subs2 = JMSUtil.createDurableSubscriber(connection_3, topic,
clientID + "2", subscriptionName + "2");
TopicControl topicControl = createManagementControl();
Assert.assertEquals(3, topicControl.listAllSubscriptions().length);
@@ -171,7 +171,7 @@
Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName +
"2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2",
subscriptionName + "2");
TopicControl topicControl = createManagementControl();
String jsonString = topicControl.listDurableSubscriptionsAsJSON();
@@ -179,7 +179,7 @@
Assert.assertEquals(2, infos.length);
Assert.assertEquals(clientID, infos[0].getClientID());
Assert.assertEquals(subscriptionName, infos[0].getName());
- Assert.assertEquals(clientID, infos[1].getClientID());
+ Assert.assertEquals(clientID+"2", infos[1].getClientID());
Assert.assertEquals(subscriptionName + "2", infos[1].getName());
jsonString = topicControl.listNonDurableSubscriptionsAsJSON();
@@ -344,7 +344,7 @@
Connection connection_1 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_1, topic, clientID, subscriptionName);
Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName +
"2");
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID+"2",
subscriptionName + "2");
JMSUtil.sendMessages(topic, 3);
@@ -438,7 +438,7 @@
Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName +
"2");
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2",
subscriptionName + "2");
TopicControl topicControl = createManagementControl();
@@ -460,7 +460,7 @@
Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic,
clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
- MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic,
clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic,
clientID+"2", subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
TopicControl topicControl = createManagementControl();