JBoss hornetq SVN: r11733 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2011-11-21 18:09:13 -0500 (Mon, 21 Nov 2011)
New Revision: 11733
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
JBPAPP-6030 - Avoiding distributed deadlock on receiveImmediate()
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-21 20:43:40 UTC (rev 11732)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-21 23:09:13 UTC (rev 11733)
@@ -398,9 +398,15 @@
{
checkClosed();
- SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
-
- channel.send(request);
+ // JBPAPP-6030 - Using the executor to avoid distributed dead locks
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
+ channel.send(request);
+ }
+ });
}
public ClientConsumer createConsumer(final SimpleString queueName) throws HornetQException
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-11-21 20:43:40 UTC (rev 11732)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-11-21 23:09:13 UTC (rev 11733)
@@ -389,33 +389,30 @@
{
promptDelivery();
- Future future = new Future();
-
- messageQueue.getExecutor().execute(future);
-
- boolean ok = future.await(10000);
-
- if (!ok)
+ // JBPAPP-6030 - Using the executor to avoid distributed dead locks
+ messageQueue.getExecutor().execute(new Runnable()
{
- log.warn("Timed out waiting for executor");
- }
+ public void run()
+ {
+ try
+ {
+ // We execute this on the same executor to make sure the force delivery message is written after
+ // any delivery is completed
- try
- {
- // We execute this on the same executor to make sure the force delivery message is written after
- // any delivery is completed
+ ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
- ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
+ forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
+ forcedDeliveryMessage.setAddress(messageQueue.getName());
- forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
- forcedDeliveryMessage.setAddress(messageQueue.getName());
+ callback.sendMessage(forcedDeliveryMessage, id, 0);
+ }
+ catch (Exception e)
+ {
+ ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
+ }
+ }
+ });
- callback.sendMessage(forcedDeliveryMessage, id, 0);
- }
- catch (Exception e)
- {
- ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
- }
}
public LinkedList<MessageReference> cancelRefs(final boolean failed,
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-11-21 20:43:40 UTC (rev 11732)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-11-21 23:09:13 UTC (rev 11733)
@@ -569,7 +569,11 @@
{
ServerConsumer consumer = consumers.get(consumerID);
- consumer.forceDelivery(sequence);
+ // this would be possible if the server consumer was closed by pings/pongs.. etc
+ if (consumer != null)
+ {
+ consumer.forceDelivery(sequence);
+ }
}
public void acknowledge(final long consumerID, final long messageID) throws Exception
13 years, 1 month
JBoss hornetq SVN: r11732 - branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 15:43:40 -0500 (Mon, 21 Nov 2011)
New Revision: 11732
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
Log:
Stomp to throw exception on inexistent destination = HORNETQ-799
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-21 20:42:38 UTC (rev 11731)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-21 20:43:40 UTC (rev 11732)
@@ -53,6 +53,10 @@
{
// Constants -----------------------------------------------------
+ private static final SimpleString JMS_TOPIC_PREFIX = new SimpleString("jms.topic");
+
+ private static final SimpleString JMS_QUEUE_PREFIX = new SimpleString("jms.queue");
+
private static final Logger log = Logger.getLogger(StompProtocolManager.class);
// TODO use same value than HornetQConnection
@@ -588,7 +592,7 @@
*/
private void validateDestination(SimpleString address) throws Exception, HornetQException
{
- if (!address.equals(managementAddress))
+ if ((address.startsWith(JMS_QUEUE_PREFIX) || address.startsWith(JMS_TOPIC_PREFIX)) && !address.equals(managementAddress))
{
Bindings binding = server.getPostOffice().lookupBindingsForAddress(address);
if (binding == null || binding.getBindings().size() == 0)
13 years, 1 month
JBoss hornetq SVN: r11731 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 15:42:38 -0500 (Mon, 21 Nov 2011)
New Revision: 11731
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
Log:
Stomp to throw exception on inexistent destination = HORNETQ-799
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-21 15:01:40 UTC (rev 11730)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-21 20:42:38 UTC (rev 11731)
@@ -53,6 +53,10 @@
{
// Constants -----------------------------------------------------
+ private static final SimpleString JMS_TOPIC_PREFIX = new SimpleString("jms.topic");
+
+ private static final SimpleString JMS_QUEUE_PREFIX = new SimpleString("jms.queue");
+
private static final Logger log = Logger.getLogger(StompProtocolManager.class);
// TODO use same value than HornetQConnection
@@ -588,7 +592,7 @@
*/
private void validateDestination(SimpleString address) throws Exception, HornetQException
{
- if (!address.equals(managementAddress))
+ if ((address.startsWith(JMS_QUEUE_PREFIX) || address.startsWith(JMS_TOPIC_PREFIX)) && !address.equals(managementAddress))
{
Bindings binding = server.getPostOffice().lookupBindingsForAddress(address);
if (binding == null || binding.getBindings().size() == 0)
13 years, 1 month
JBoss hornetq SVN: r11730 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 10:01:40 -0500 (Mon, 21 Nov 2011)
New Revision: 11730
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
Stomp to throw exception on inexistent destination = HORNETQ-799
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2011-11-21 15:00:36 UTC (rev 11729)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2011-11-21 15:01:40 UTC (rev 11730)
@@ -1061,6 +1061,29 @@
sendFrame(frame);
}
+ public void testSubscribeToInvalidTopic() throws Exception
+ {
+
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName()+"IDontExist" +
+ "\n" +
+ "receipt: 12\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(10000);
+
+ System.out.println(frame);
+ Assert.assertTrue(frame.startsWith("ERROR"));
+ }
+
public void testDurableSubscriberWithReconnection() throws Exception
{
13 years, 1 month
JBoss hornetq SVN: r11729 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 10:00:36 -0500 (Mon, 21 Nov 2011)
New Revision: 11729
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
Log:
just removing unused import
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-11-21 15:00:10 UTC (rev 11728)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-11-21 15:00:36 UTC (rev 11729)
@@ -16,7 +16,6 @@
import java.util.Map;
import java.util.Set;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.server.HornetQComponent;
13 years, 1 month
JBoss hornetq SVN: r11728 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 10:00:10 -0500 (Mon, 21 Nov 2011)
New Revision: 11728
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
Log:
Stomp to throw exception on inexistent destination = HORNETQ-799
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-21 14:56:37 UTC (rev 11727)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-21 15:00:10 UTC (rev 11728)
@@ -61,6 +61,8 @@
// Attributes ----------------------------------------------------
private final HornetQServer server;
+
+ private final SimpleString managementAddress;
private final Executor executor;
@@ -106,6 +108,7 @@
public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
{
this.server = server;
+ this.managementAddress = server.getConfiguration().getManagementAddress();
this.executor = server.getExecutorFactory().getExecutor();
}
@@ -341,6 +344,8 @@
}
subscriptionID = "subscription/" + destination;
}
+
+ validateDestination(new SimpleString(destination));
StompSession stompSession = getSession(connection);
stompSession.setNoLocal(noLocal);
if (stompSession.containsSubscription(subscriptionID))
@@ -583,10 +588,15 @@
*/
private void validateDestination(SimpleString address) throws Exception, HornetQException
{
- Bindings binding = server.getPostOffice().lookupBindingsForAddress(address);
- if (binding == null || binding.getBindings().size() == 0)
+ if (!address.equals(managementAddress))
{
- throw new HornetQException(HornetQException.ADDRESS_DOES_NOT_EXIST, "Address " + address + " has not been deployed");
+ Bindings binding = server.getPostOffice().lookupBindingsForAddress(address);
+ if (binding == null || binding.getBindings().size() == 0)
+ {
+ throw new HornetQException(HornetQException.ADDRESS_DOES_NOT_EXIST, "Address " + address +
+ " has not been deployed");
+ }
+
}
}
13 years, 1 month
JBoss hornetq SVN: r11727 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 09:56:37 -0500 (Mon, 21 Nov 2011)
New Revision: 11727
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
https://issues.jboss.org/browse/HORNETQ-798 - avoiding synchronization on Queue while doing IO on paging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-11-21 14:54:27 UTC (rev 11726)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-11-21 14:56:37 UTC (rev 11727)
@@ -1612,166 +1612,168 @@
// This method will deliver as many messages as possible until all consumers are busy or there are no more matching
// or available messages
- private synchronized void deliver()
+ private void deliver()
{
- if (paused || consumerList.isEmpty())
+ synchronized (this)
{
- return;
- }
-
- if (log.isDebugEnabled())
- {
- log.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
- }
+ if (paused || consumerList.isEmpty())
+ {
+ return;
+ }
- int busyCount = 0;
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
+ }
- int nullRefCount = 0;
+ int busyCount = 0;
- int size = consumerList.size();
+ int nullRefCount = 0;
- int endPos = pos == size - 1 ? 0 : size - 1;
+ int size = consumerList.size();
- int numRefs = messageReferences.size();
+ int endPos = pos == size - 1 ? 0 : size - 1;
- int handled = 0;
-
- long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
+ int numRefs = messageReferences.size();
- while (handled < numRefs)
- {
- if (handled == MAX_DELIVERIES_IN_LOOP)
- {
- // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too long
+ int handled = 0;
- deliverAsync();
+ long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
- return;
- }
-
- if (System.currentTimeMillis() > timeout)
+ while (handled < numRefs)
{
- if (isTrace)
+ if (handled == MAX_DELIVERIES_IN_LOOP)
{
- log.trace("delivery has been running for too long. Scheduling another delivery task now");
- }
-
- deliverAsync();
-
- return;
- }
-
+ // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
+ // long
- ConsumerHolder holder = consumerList.get(pos);
+ deliverAsync();
- Consumer consumer = holder.consumer;
+ return;
+ }
- if (holder.iter == null)
- {
- holder.iter = messageReferences.iterator();
- }
-
- MessageReference ref;
-
- if (holder.iter.hasNext())
- {
- ref = holder.iter.next();
- }
- else
- {
- ref = null;
- }
-
-
- if (ref == null)
- {
- nullRefCount++;
- }
- else
- {
- if (checkExpired(ref))
+ if (System.currentTimeMillis() > timeout)
{
if (isTrace)
{
- log.trace("Reference " + ref + " being expired");
+ log.trace("delivery has been running for too long. Scheduling another delivery task now");
}
- holder.iter.remove();
- refRemoved(ref);
-
- handled++;
+ deliverAsync();
- continue;
+ return;
}
- Consumer groupConsumer = null;
-
- if (isTrace)
+ ConsumerHolder holder = consumerList.get(pos);
+
+ Consumer consumer = holder.consumer;
+
+ if (holder.iter == null)
{
- log.trace("Queue " + this.getName() + " is delivering reference " + ref);
+ holder.iter = messageReferences.iterator();
}
- // If a group id is set, then this overrides the consumer chosen round-robin
+ MessageReference ref;
- SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+ if (holder.iter.hasNext())
+ {
+ ref = holder.iter.next();
+ }
+ else
+ {
+ ref = null;
+ }
- if (groupID != null)
+ if (ref == null)
{
- groupConsumer = groups.get(groupID);
+ nullRefCount++;
+ }
+ else
+ {
+ if (checkExpired(ref))
+ {
+ if (isTrace)
+ {
+ log.trace("Reference " + ref + " being expired");
+ }
+ holder.iter.remove();
- if (groupConsumer != null)
+ refRemoved(ref);
+
+ handled++;
+
+ continue;
+ }
+
+ Consumer groupConsumer = null;
+
+ if (isTrace)
{
- consumer = groupConsumer;
+ log.trace("Queue " + this.getName() + " is delivering reference " + ref);
}
- }
- HandleStatus status = handle(ref, consumer);
+ // If a group id is set, then this overrides the consumer chosen round-robin
- if (status == HandleStatus.HANDLED)
- {
- holder.iter.remove();
+ SimpleString groupID = ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
- refRemoved(ref);
+ if (groupID != null)
+ {
+ groupConsumer = groups.get(groupID);
- if (groupID != null && groupConsumer == null)
+ if (groupConsumer != null)
+ {
+ consumer = groupConsumer;
+ }
+ }
+
+ HandleStatus status = handle(ref, consumer);
+
+ if (status == HandleStatus.HANDLED)
{
- groups.put(groupID, consumer);
+ holder.iter.remove();
+
+ refRemoved(ref);
+
+ if (groupID != null && groupConsumer == null)
+ {
+ groups.put(groupID, consumer);
+ }
+
+ handled++;
}
+ else if (status == HandleStatus.BUSY)
+ {
+ holder.iter.repeat();
- handled++;
+ busyCount++;
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ }
}
- else if (status == HandleStatus.BUSY)
- {
- holder.iter.repeat();
- busyCount++;
- }
- else if (status == HandleStatus.NO_MATCH)
+ if (pos == endPos)
{
- }
- }
+ // Round robin'd all
- if (pos == endPos)
- {
- // Round robin'd all
-
- if (nullRefCount + busyCount == size)
- {
- if (log.isDebugEnabled())
+ if (nullRefCount + busyCount == size)
{
- log.debug(this + "::All the consumers were busy, giving up now");
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::All the consumers were busy, giving up now");
+ }
+ break;
}
- break;
+
+ nullRefCount = busyCount = 0;
}
- nullRefCount = busyCount = 0;
- }
+ pos++;
- pos++;
-
- if (pos == size)
- {
- pos = 0;
+ if (pos == size)
+ {
+ pos = 0;
+ }
}
}
@@ -1817,7 +1819,7 @@
}
}
- private synchronized void depage()
+ private void depage()
{
depagePending = false;
13 years, 1 month
JBoss hornetq SVN: r11726 - branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 09:54:27 -0500 (Mon, 21 Nov 2011)
New Revision: 11726
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/Topology.java
Log:
Fix on Topology
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/Topology.java 2011-11-21 14:36:50 UTC (rev 11725)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/Topology.java 2011-11-21 14:54:27 UTC (rev 11726)
@@ -60,7 +60,7 @@
*/
private final Map<String, TopologyMember> topology = new ConcurrentHashMap<String, TopologyMember>();
- private transient final Map<String, Long> mapDelete = new ConcurrentHashMap<String, Long>();
+ private transient Map<String, Long> mapDelete;
public Topology(final Object owner)
{
@@ -158,7 +158,7 @@
public boolean updateMember(final long uniqueEventID, final String nodeId, final TopologyMember memberInput)
{
- Long deleteTme = mapDelete.get(nodeId);
+ Long deleteTme = getMapDelete().get(nodeId);
if (deleteTme != null && uniqueEventID < deleteTme)
{
log.debug("Update uniqueEvent=" + uniqueEventID +
@@ -306,7 +306,7 @@
}
else
{
- mapDelete.put(nodeId, uniqueEventID);
+ getMapDelete().put(nodeId, uniqueEventID);
member = topology.remove(nodeId);
}
}
@@ -536,4 +536,13 @@
}
}
+ private synchronized Map<String, Long> getMapDelete()
+ {
+ if (mapDelete == null)
+ {
+ mapDelete = new ConcurrentHashMap<String, Long>();
+ }
+ return mapDelete;
+ }
+
}
13 years, 1 month
JBoss hornetq SVN: r11725 - in branches/Branch_2_2_AS7: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 09:36:50 -0500 (Mon, 21 Nov 2011)
New Revision: 11725
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
Stomp to throw exception on inexistent destination = HORNETQ-799
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-21 14:15:17 UTC (rev 11724)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-11-21 14:36:50 UTC (rev 11725)
@@ -61,6 +61,8 @@
// Attributes ----------------------------------------------------
private final HornetQServer server;
+
+ private final SimpleString managementAddress;
private final Executor executor;
@@ -106,6 +108,7 @@
public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
{
this.server = server;
+ this.managementAddress = server.getConfiguration().getManagementAddress();
this.executor = server.getExecutorFactory().getExecutor();
}
@@ -341,6 +344,8 @@
}
subscriptionID = "subscription/" + destination;
}
+
+ validateDestination(new SimpleString(destination));
StompSession stompSession = getSession(connection);
stompSession.setNoLocal(noLocal);
if (stompSession.containsSubscription(subscriptionID))
@@ -583,10 +588,15 @@
*/
private void validateDestination(SimpleString address) throws Exception, HornetQException
{
- Bindings binding = server.getPostOffice().lookupBindingsForAddress(address);
- if (binding == null || binding.getBindings().size() == 0)
+ if (!address.equals(managementAddress))
{
- throw new HornetQException(HornetQException.ADDRESS_DOES_NOT_EXIST, "Address " + address + " has not been deployed");
+ Bindings binding = server.getPostOffice().lookupBindingsForAddress(address);
+ if (binding == null || binding.getBindings().size() == 0)
+ {
+ throw new HornetQException(HornetQException.ADDRESS_DOES_NOT_EXIST, "Address " + address +
+ " has not been deployed");
+ }
+
}
}
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2011-11-21 14:15:17 UTC (rev 11724)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2011-11-21 14:36:50 UTC (rev 11725)
@@ -1061,6 +1061,29 @@
sendFrame(frame);
}
+ public void testSubscribeToInvalidTopic() throws Exception
+ {
+
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName()+"IDontExist" +
+ "\n" +
+ "receipt: 12\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(10000);
+
+ System.out.println(frame);
+ Assert.assertTrue(frame.startsWith("ERROR"));
+ }
+
public void testDurableSubscriberWithReconnection() throws Exception
{
13 years, 1 month
JBoss hornetq SVN: r11724 - branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-21 09:15:17 -0500 (Mon, 21 Nov 2011)
New Revision: 11724
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
We still need AccessController
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-11-19 09:16:22 UTC (rev 11723)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-11-21 14:15:17 UTC (rev 11724)
@@ -318,8 +318,14 @@
{
throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
}
-
- loadBalancingPolicy = (ConnectionLoadBalancingPolicy) ClassloadingUtil.newInstanceFromClassLoader(connectionLoadBalancingPolicyClassName);
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ public Object run()
+ {
+ loadBalancingPolicy = (ConnectionLoadBalancingPolicy)ClassloadingUtil.newInstanceFromClassLoader(connectionLoadBalancingPolicyClassName);
+ return null;
+ }
+ });
}
private synchronized void initialise() throws HornetQException
13 years, 1 month