[Jboss-cvs] JBoss Messaging SVN: r1271 - in trunk: src/main/org/jboss/jms/server src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/util src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/local src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/base tests/src/org/jboss/test/messaging/core/plugin tests/src/org/jboss/test/messaging/jms/server/destination tests/src/org/jboss/test/messaging/jms/server/destination/base tests/src/org/jboss/test/messaging/tools/jmx/rmi
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Sep 9 09:46:03 EDT 2006
Author: timfox
Date: 2006-09-09 09:45:46 -0400 (Sat, 09 Sep 2006)
New Revision: 1271
Added:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
Removed:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ExchangeInternal.java
Modified:
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
trunk/src/main/org/jboss/jms/server/destination/ManagedQueue.java
trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java
trunk/src/main/org/jboss/jms/server/destination/QueueService.java
trunk/src/main/org/jboss/jms/util/MessageQueueNameHelper.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/local/Queue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CheckMessage.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagesRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendNodeIdRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/UnbindRequest.java
trunk/tests/src/org/jboss/test/messaging/core/base/QueueTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
trunk/tests/src/org/jboss/test/messaging/jms/server/destination/TopicManagementTest.java
trunk/tests/src/org/jboss/test/messaging/jms/server/destination/base/DestinationManagementTestBase.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RemoteTestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
Log:
More clustering work
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -836,7 +836,7 @@
//this should be combined
String destType = isQueue ? "Queue" : "Topic";
- String className = "org.jboss.jms.server.destination." + destType;
+ String className = "org.jboss.jms.server.destination." + destType + "Service";
String ons ="jboss.messaging.destination:service="+ destType + ",name=" + name;
ObjectName on = new ObjectName(ons);
@@ -900,7 +900,9 @@
//
String destType = isQueue ? "Queue" : "Topic";
- String className = "org.jboss.jms.server.destination." + destType;
+ String className = "org.jboss.jms.server.destination." + destType + "Service";
+
+ log.info("class name is " + className);
String ons ="jboss.messaging.destination:service="+ destType + ",name=" + name;
ObjectName on = new ObjectName(ons);
@@ -915,6 +917,8 @@
" <attribute name=\"PageSize\">" + pageSize + "</attribute>" +
" <attribute name=\"DownCacheSize\">" + downCacheSize + "</attribute>" +
"</mbean>";
+
+ log.info(destinationMBeanConfig);
return createDestinationInternal(destinationMBeanConfig, on, jndiName, true, fullSize,
pageSize, downCacheSize);
Modified: trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -36,10 +36,8 @@
{
// Constants -----------------------------------------------------
-
// Static --------------------------------------------------------
-
// Attributes ----------------------------------------------------
private ObjectName serverPeerObjectName;
@@ -64,7 +62,18 @@
protected IdManager idm;
+ private boolean createdProgrammatically;
+
// Constructors --------------------------------------------------
+
+ public DestinationServiceSupport(boolean createdProgrammatically)
+ {
+ this.createdProgrammatically = createdProgrammatically;
+ }
+
+ public DestinationServiceSupport()
+ {
+ }
// ServiceMBeanSupport overrides -----------------------------------
@@ -262,6 +271,11 @@
}
destination.setClustered(clustered);
}
+
+ public boolean isCreatedProgrammatically()
+ {
+ return createdProgrammatically;
+ }
// JMX managed operations ----------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedQueue.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedQueue.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedQueue.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -28,7 +28,6 @@
import org.jboss.jms.selector.Selector;
import org.jboss.messaging.core.plugin.contract.Binding;
-import org.w3c.dom.Element;
/**
* A ManagedQueue
Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -31,7 +31,6 @@
import org.jboss.jms.util.MessageQueueNameHelper;
import org.jboss.messaging.core.local.Queue;
import org.jboss.messaging.core.plugin.contract.Binding;
-import org.w3c.dom.Element;
/**
* A ManagedTopic
Modified: trunk/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/QueueService.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/jms/server/destination/QueueService.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -40,6 +40,13 @@
destination = new ManagedQueue();
}
+ public QueueService(boolean createdProgrammatically)
+ {
+ super(createdProgrammatically);
+
+ destination = new ManagedQueue();
+ }
+
// JMX managed attributes ----------------------------------------
public int getMessageCount() throws Exception
Modified: trunk/src/main/org/jboss/jms/util/MessageQueueNameHelper.java
===================================================================
--- trunk/src/main/org/jboss/jms/util/MessageQueueNameHelper.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/jms/util/MessageQueueNameHelper.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -24,7 +24,7 @@
import java.util.StringTokenizer;
/**
- * A JMSExchangeHelper
+ * A MessageQueueNameHelper
*
* By convention, we name durable topic message queue names in the following way:
*
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -681,6 +681,8 @@
if (trace) { log.trace(this + " handles " + ref + (tx == null ? " non-transactionally" : " in transaction: " + tx)); }
+ log.info("handleInternal");
+
//Each channel has its own copy of the reference
ref = ref.copy();
@@ -723,13 +725,17 @@
}
addReferenceInMemory(ref);
+
+ log.info("added in memory");
// We only do delivery if there are receivers that haven't said they don't want
// any more references.
if (receiversReady)
{
// Prompt delivery
+ log.info("delivering");
deliverInternal();
+ log.info("delivered");
}
}
else
Modified: trunk/src/main/org/jboss/messaging/core/local/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/Queue.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/local/Queue.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -87,6 +87,8 @@
public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx)
{
+ log.info("handle:" + ref);
+
//If the queue has a Filter we do not accept any Message references that do not
//match the Filter
if (filter != null && !filter.accept(ref))
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -114,7 +114,7 @@
super.stop();
}
- // Exchange implementation ---------------------------------------
+ // PostOffice implementation ---------------------------------------
public Binding bindQueue(String queueName, String condition, boolean noLocal,
Queue queue) throws Exception
@@ -567,21 +567,21 @@
{
Map map = new HashMap();
map.put("INSERT_BINDING",
- "INSERT INTO JMS_EXCHANGE_BINDING (EXCHANGE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, NOLOCAL, CHANNEL_ID) " +
+ "INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, NOLOCAL, CHANNEL_ID) " +
"VALUES (?, ?, ?, ?, ?, ?, ?)");
map.put("DELETE_BINDING",
- "DELETE FROM JMS_EXCHANGE_BINDING WHERE EXCHANGE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?");
+ "DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?");
map.put("LOAD_BINDINGS",
- "SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, NOLOCAL, CHANNEL_ID FROM JMS_EXCHANGE_BINDING " +
- "WHERE EXCHANGE_NAME = ?");
+ "SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, NOLOCAL, CHANNEL_ID FROM JMS_POSTOFFICE " +
+ "WHERE POSTOFFICE_NAME = ?");
return map;
}
protected Map getDefaultDDLStatements()
{
Map map = new HashMap();
- map.put("CREATE_BINDING_TABLE",
- "CREATE TABLE JMS_EXCHANGE_BINDING (EXCHANGE_NAME VARCHAR(256), NODE_ID VARCHAR(256)," +
+ map.put("CREATE_POSTOFFICE_TABLE",
+ "CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(256), NODE_ID VARCHAR(256)," +
"QUEUE_NAME VARCHAR(1024), CONDITION VARCHAR(1024), " +
"SELECTOR VARCHAR(1024), NOLOCAL CHAR(1), CHANNEL_ID BIGINT)");
return map;
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindRequest.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/BindRequest.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -66,9 +66,9 @@
this.durable = durable;
}
- public void execute(ExchangeInternal exchange) throws Exception
+ public void execute(PostOfficeInternal office) throws Exception
{
- exchange.addBindingFromCluster(nodeId, queueName, condition,
+ office.addBindingFromCluster(nodeId, queueName, condition,
filterString, noLocal, channelId, durable);
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -67,7 +67,7 @@
private long txId;
- private ExchangeInternal exchange;
+ private PostOfficeInternal office;
void addMessage(String routingKey, Message message)
{
@@ -91,13 +91,13 @@
}
}
- CastMessagesCallback(String nodeId, long txId, ExchangeInternal exchange)
+ CastMessagesCallback(String nodeId, long txId, PostOfficeInternal office)
{
this.nodeId = nodeId;
this.txId = txId;
- this.exchange = exchange;
+ this.office = office;
}
public void afterCommit(boolean onePhase) throws Exception
@@ -107,7 +107,7 @@
// Cast the non persistent - this don't need to go into a holding area on the receiving node
ClusterRequest req = new MessagesRequest(nonPersistent);
- exchange.asyncSendRequest(req);
+ office.asyncSendRequest(req);
}
if (persistent != null)
@@ -116,7 +116,7 @@
ClusterRequest req = new TransactionRequest(nodeId, txId);
// Stack must be FIFO
- exchange.asyncSendRequest(req);
+ office.asyncSendRequest(req);
}
nonPersistent = persistent = null;
@@ -139,7 +139,7 @@
ClusterRequest req = new TransactionRequest(nodeId, txId, persistent);
//Stack must be FIFO
- exchange.asyncSendRequest(req);
+ office.asyncSendRequest(req);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CheckMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CheckMessage.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CheckMessage.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -41,8 +41,8 @@
this.nodeId = nodeId;
}
- public void execute(ExchangeInternal exchange) throws Exception
+ public void execute(PostOfficeInternal office) throws Exception
{
- exchange.check(nodeId);
+ office.check(nodeId);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -36,5 +36,5 @@
*/
interface ClusterRequest extends Serializable
{
- void execute(ExchangeInternal exchange) throws Exception;
+ void execute(PostOfficeInternal office) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -66,7 +66,7 @@
* $Id$
*
*/
-public class ClusteredPostOfficeImpl extends PostOfficeImpl implements ClusteredPostOffice, ExchangeInternal
+public class ClusteredPostOfficeImpl extends PostOfficeImpl implements ClusteredPostOffice, PostOfficeInternal
{
private static final Logger log = Logger.getLogger(ClusteredPostOfficeImpl.class);
@@ -209,7 +209,7 @@
this.controlMessageListener = new ControlMessageListener();
- this.requestHandler = new ExchangeRequestHandler();
+ this.requestHandler = new PostOfficeRequestHandler();
this.controlMembershipListener = new ControlMembershipListener();
@@ -370,7 +370,7 @@
}
else
{
- //It's a binding on a different exchange instance on the cluster
+ //It's a binding on a different office instance on the cluster
sendRemotely = true;
if (ref.isReliable() && binding.isDurable())
@@ -383,7 +383,7 @@
}
//Now we've sent the message to all the local subscriptions, we might also need
- //to multicast the message to the other exchange instances on the cluster if there are
+ //to multicast the message to the other office instances on the cluster if there are
//subscriptions on those nodes that need to receive the message
if (sendRemotely)
{
@@ -424,7 +424,7 @@
return true;
}
- // ExchangeInternal implementation ------------------------------------------------------------------
+ // PostOfficeInternal implementation ------------------------------------------------------------------
/*
* Called when another node adds a binding
@@ -693,9 +693,7 @@
return false;
}
}
-
- //ExchangeSupport overrides -------------------------------------------------
-
+
protected void loadBindings() throws Exception
{
// TODO I need to know whether this call times out - how do I know this??
@@ -1081,7 +1079,7 @@
/*
* This class is used to handle synchronous requests
*/
- private class ExchangeRequestHandler implements RequestHandler
+ private class PostOfficeRequestHandler implements RequestHandler
{
public Object handle(Message message)
{
Deleted: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ExchangeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ExchangeInternal.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ExchangeInternal.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -1,59 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.plugin.postoffice.cluster;
-
-import java.util.List;
-
-import org.jboss.messaging.core.Message;
-import org.jgroups.Address;
-
-/**
- * A ExchangeInternal
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-interface ExchangeInternal
-{
- void addBindingFromCluster(String nodeId, String queueName, String condition,
- String filterString, boolean noLocal, long channelId, boolean durable)
- throws Exception;
-
- void removeBindingFromCluster(String nodeId, String queueName)
- throws Exception;
-
- void handleAddressNodeMapping(Address address, String nodeId)
- throws Exception;
-
- void routeFromCluster(Message message, String routingKey) throws Exception;
-
- void asyncSendRequest(ClusterRequest request) throws Exception;
-
- void addToHoldingArea(TransactionId id, List messageHolders) throws Exception;
-
- void commitTransaction(TransactionId id) throws Exception;
-
- void check(String nodeId) throws Exception;
-}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRequest.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageRequest.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -49,8 +49,8 @@
this.message = message;
}
- public void execute(ExchangeInternal exchange) throws Exception
+ public void execute(PostOfficeInternal office) throws Exception
{
- exchange.routeFromCluster(message, routingKey);
+ office.routeFromCluster(message, routingKey);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagesRequest.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessagesRequest.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -46,7 +46,7 @@
this.messageHolders = messageHolders;
}
- public void execute(ExchangeInternal exchange) throws Exception
+ public void execute(PostOfficeInternal office) throws Exception
{
Iterator iter = messageHolders.iterator();
@@ -54,7 +54,7 @@
{
MessageHolder holder = (MessageHolder)iter.next();
- exchange.routeFromCluster(holder.getMessage(), holder.getRoutingKey());
+ office.routeFromCluster(holder.getMessage(), holder.getRoutingKey());
}
}
}
Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -0,0 +1,60 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.Message;
+import org.jgroups.Address;
+
+/**
+ *
+ * A PostOfficeInternal
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+interface PostOfficeInternal
+{
+ void addBindingFromCluster(String nodeId, String queueName, String condition,
+ String filterString, boolean noLocal, long channelId, boolean durable)
+ throws Exception;
+
+ void removeBindingFromCluster(String nodeId, String queueName)
+ throws Exception;
+
+ void handleAddressNodeMapping(Address address, String nodeId)
+ throws Exception;
+
+ void routeFromCluster(Message message, String routingKey) throws Exception;
+
+ void asyncSendRequest(ClusterRequest request) throws Exception;
+
+ void addToHoldingArea(TransactionId id, List messageHolders) throws Exception;
+
+ void commitTransaction(TransactionId id) throws Exception;
+
+ void check(String nodeId) throws Exception;
+}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendNodeIdRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendNodeIdRequest.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendNodeIdRequest.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -48,8 +48,8 @@
this.nodeId = nodeId;
}
- public void execute(ExchangeInternal exchange) throws Exception
+ public void execute(PostOfficeInternal office) throws Exception
{
- exchange.handleAddressNodeMapping(address, nodeId);
+ office.handleAddressNodeMapping(address, nodeId);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/TransactionRequest.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -66,17 +66,17 @@
tryTransaction = false;
}
- public void execute(ExchangeInternal exchange) throws Exception
+ public void execute(PostOfficeInternal office) throws Exception
{
TransactionId id = new TransactionId(nodeId, txId);
if (tryTransaction)
{
- exchange.addToHoldingArea(id, messageHolders);
+ office.addToHoldingArea(id, messageHolders);
}
else
{
- exchange.commitTransaction(id);
+ office.commitTransaction(id);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/UnbindRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/UnbindRequest.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/UnbindRequest.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -46,8 +46,8 @@
this.queueName = queueName;
}
- public void execute(ExchangeInternal exchange) throws Exception
+ public void execute(PostOfficeInternal office) throws Exception
{
- exchange.removeBindingFromCluster(nodeId, queueName);
+ office.removeBindingFromCluster(nodeId, queueName);
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/base/QueueTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/base/QueueTestBase.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/tests/src/org/jboss/test/messaging/core/base/QueueTestBase.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -31,6 +31,7 @@
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.message.MessageFactory;
import org.jboss.messaging.core.plugin.IdManager;
import org.jboss.messaging.core.plugin.JDBCPersistenceManager;
import org.jboss.messaging.core.plugin.SimpleMessageStore;
@@ -194,13 +195,21 @@
return;
}
+ log.info("starting");
+
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
SimpleReceiver r1 = new SimpleReceiver("ONE", SimpleReceiver.ACKING);
SimpleReceiver r2 = new SimpleReceiver("TWO", SimpleReceiver.ACKING);
+
+ log.info("created");
queue.add(r1);
queue.add(r2);
+
+ log.info("handling");
Delivery d = queue.handle(observer, createReference(0, false, "payload"), null);
+
+ log.info("handled");
assertTrue(d.isDone());
List l1 = r1.getMessages();
@@ -6372,12 +6381,12 @@
private MessageReference createReference(long id, boolean reliable, Serializable payload)
{
- return createReference(id, reliable, payload);
+ return ms.reference(MessageFactory.createCoreMessage(id, reliable, payload));
}
private MessageReference createReference(long id)
{
- return createReference(id);
+ return ms.reference(MessageFactory.createCoreMessage(id));
}
// Inner classes -------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -37,7 +37,7 @@
/**
*
- * A ClusteredTopicExchangeTest
+ * A ClusteredPostOfficeTest
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
@@ -125,7 +125,7 @@
try
{
- //Start one exchange
+ //Start one office
office1 = createClusteredPostOffice("node1", "testgroup");
@@ -138,7 +138,7 @@
Binding binding2 =
office1.bindClusteredQueue("sub2", "topic1", false, queue2);
- //Start another exchange - make sure it picks up the bindings from the first node
+ //Start another office - make sure it picks up the bindings from the first node
office2 = createClusteredPostOffice("node2", "testgroup");
@@ -219,7 +219,7 @@
assertEquivalent(binding3, (Binding)bindings.get(0));
assertEquivalent(binding4, (Binding)bindings.get(1));
- //Add a third exchange
+ //Add a third office
office3 = createClusteredPostOffice("node3", "testgroup");
@@ -306,7 +306,7 @@
assertEquivalent(binding6, (Binding)bindings.get(3));
assertEquivalent(binding7, (Binding)bindings.get(4));
- //Stop exchange 1
+ //Stop office 1
office1.stop();
//Need to sleep since it may take some time for the view changed request to reach the
@@ -335,7 +335,7 @@
assertEquivalent(binding5, (Binding)bindings.get(2));
assertEquivalent(binding6, (Binding)bindings.get(3));
- //Stop exchange 2
+ //Stop office 2
office2.stop();
bindings = office3.listBindingsForCondition("topic1");
@@ -345,7 +345,7 @@
assertEquivalent(binding5, (Binding)bindings.get(0));
assertEquivalent(binding6, (Binding)bindings.get(1));
- //Restart exchange 1 and exchange 2
+ //Restart office 1 and office 2
office1 = createClusteredPostOffice("node1", "testgroup");
office2 = createClusteredPostOffice("node2", "testgroup");
@@ -371,7 +371,7 @@
assertEquivalent(binding5, (Binding)bindings.get(0));
assertEquivalent(binding6, (Binding)bindings.get(1));
- //Stop all exchanges
+ //Stop all offices
office1.stop();
office2.stop();
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -186,7 +186,7 @@
office1.stop();
- //Throw away the exchange and create another
+ //Throw away the office and create another
office2 = createPostOffice();
//Only one binding should be there
@@ -210,7 +210,7 @@
office2.stop();
- //Throw away exchange and start another
+ //Throw away office and start another
office3 = createPostOffice();
//Make sure not there
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -79,7 +79,7 @@
public void testReloadQueue() throws Exception
{
String config =
- "<mbean code=\"org.jboss.jms.server.destination.Queue\" " +
+ "<mbean code=\"org.jboss.jms.server.destination.QueueService\" " +
" name=\"somedomain:service=Queue,name=ReloadQueue\"" +
" xmbean-dd=\"xmdesc/Queue-xmbean.xml\">" +
" <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/destination/TopicManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/destination/TopicManagementTest.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/destination/TopicManagementTest.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -83,7 +83,7 @@
public void testReloadTopic() throws Exception
{
String config =
- "<mbean code=\"org.jboss.jms.server.destination.Topic\" " +
+ "<mbean code=\"org.jboss.jms.server.destination.TopicService\" " +
" name=\"somedomain:service=Topic,name=ReloadTopic\"" +
" xmbean-dd=\"xmdesc/Topic-xmbean.xml\">" +
" <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
Modified: trunk/tests/src/org/jboss/test/messaging/jms/server/destination/base/DestinationManagementTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/destination/base/DestinationManagementTestBase.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/destination/base/DestinationManagementTestBase.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -34,6 +34,8 @@
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.destination.JBossTopic;
import org.jboss.jms.server.destination.ManagedDestination;
+import org.jboss.jms.server.destination.ManagedQueue;
+import org.jboss.jms.server.destination.ManagedTopic;
import org.jboss.jms.util.XMLUtil;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -89,7 +91,7 @@
ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
String config =
- "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@\" " +
+ "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@Service\" " +
" name=\"somedomain:service=@TOREPLACE@,name=Kirkwood\"" +
" xmbean-dd=\"xmdesc/@TOREPLACE at -xmbean.xml\">" +
" <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
@@ -111,13 +113,13 @@
if (isQueue())
{
- Queue q = (Queue)destinations.iterator().next();
- assertEquals("Kirkwood", q.getQueueName());
+ ManagedQueue q = (ManagedQueue)destinations.iterator().next();
+ assertEquals("Kirkwood", q.getName());
}
else
{
- Topic t = (Topic)destinations.iterator().next();
- assertEquals("Kirkwood", t.getTopicName());
+ ManagedTopic t = (ManagedTopic)destinations.iterator().next();
+ assertEquals("Kirkwood", t.getName());
}
assertEquals(serverPeerObjectName,
@@ -144,7 +146,7 @@
int downCacheSize = 789;
String config =
- "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@\" " +
+ "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@Service\" " +
" name=\"somedomain:service=@TOREPLACE@,name=Kirkwood\"" +
" xmbean-dd=\"xmdesc/@TOREPLACE at -xmbean.xml\">" +
" <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
@@ -169,14 +171,14 @@
if (isQueue())
{
- Queue q = (Queue)destinations.iterator().next();
- assertEquals("Kirkwood", q.getQueueName());
+ ManagedQueue q = (ManagedQueue)destinations.iterator().next();
+ assertEquals("Kirkwood", q.getName());
}
else
{
- Topic t = (Topic)destinations.iterator().next();
- assertEquals("Kirkwood", t.getTopicName());
+ ManagedTopic t = (ManagedTopic)destinations.iterator().next();
+ assertEquals("Kirkwood", t.getName());
}
assertEquals(serverPeerObjectName,
@@ -199,7 +201,7 @@
public void testDefaultSecurityConfiguration() throws Exception
{
String config =
- "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@\" " +
+ "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@Service\" " +
" name=\"somedomain:service=@TOREPLACE@,name=DefaultSecurity\"" +
" xmbean-dd=\"xmdesc/@TOREPLACE at -xmbean.xml\">" +
" <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
@@ -224,7 +226,7 @@
" </security>";
String config =
- "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@\"\n" +
+ "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@Service\"\n" +
" name=\"somedomain:service=@TOREPLACE@,name=DefaultSecurity\"\n" +
" xmbean-dd=\"xmdesc/@TOREPLACE at -xmbean.xml\">\n" +
" <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n" +
@@ -250,7 +252,7 @@
String testJNDIName = "/a/totally/arbitrary/jndi/name/thisisthequeue";
String config =
- "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@\" " +
+ "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@Service\" " +
" name=\"somedomain:service=@TOREPLACE@,name=Kirkwood\"" +
" xmbean-dd=\"xmdesc/@TOREPLACE at -xmbean.xml\">" +
" <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
@@ -448,7 +450,7 @@
// deploy "classically"
String config =
- "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@\" " +
+ "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@Service\" " +
" name=\"jboss.messaging.destination:service=@TOREPLACE@,name=" + destinationName + "\" " +
" xmbean-dd=\"xmdesc/@TOREPLACE at -xmbean.xml\">" +
" <depends optional-attribute-name=\"ServerPeer\">" + serverPeerObjectName + "</depends>" +
@@ -476,7 +478,7 @@
int downCacheSize = 56;
String config =
- "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@\" " +
+ "<mbean code=\"org.jboss.jms.server.destination. at TOREPLACE@Service\" " +
" name=\"somedomain:service=@TOREPLACE@,name=PageableAttributes\"" +
" xmbean-dd=\"xmdesc/@TOREPLACE at -xmbean.xml\">" +
" <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -424,16 +424,6 @@
return serverPeerObjectName;
}
- public ObjectName getDirectExchangeObjectName()
- {
- return queuePostOfficeObjectName;
- }
-
- public ObjectName getTopicExchangeObjectName()
- {
- return topicPostOfficeObjectName;
- }
-
public Set getConnectorSubsystems() throws Exception
{
RemotingJMXWrapper remoting =
@@ -480,13 +470,13 @@
return serverPeer.getPersistenceManagerInstance();
}
- public PostOffice getDirectExchange() throws Exception
+ public PostOffice getQueuePostOffice() throws Exception
{
return (PostOffice)sc.
getAttribute(queuePostOfficeObjectName, "Instance");
}
- public PostOffice getTopicExchange() throws Exception
+ public PostOffice getTopicPostOffice() throws Exception
{
return (PostOffice)sc.
getAttribute(topicPostOfficeObjectName, "Instance");
@@ -565,7 +555,7 @@
log.info("deploying queue, fullsize:" + fullSize + ", ps:" + pageSize + " dc size:" + downCacheSize);
String config =
- "<mbean code=\"org.jboss.jms.server.destination." + (isQueue ? "Queue" : "Topic") + "\"" +
+ "<mbean code=\"org.jboss.jms.server.destination." + (isQueue ? "QueueService" : "TopicService") + "\"" +
" name=\"jboss.messaging.destination:service=" + (isQueue ? "Queue" : "Topic") + ",name=" + name + "\"" +
" xmbean-dd=\"xmdesc/" + (isQueue ? "Queue" : "Topic" ) + "-xmbean.xml\">" +
(jndiName != null ? " <attribute name=\"JNDIName\">" + jndiName + "</attribute>" : "") +
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -247,14 +247,14 @@
return server.getPersistenceManager();
}
- public PostOffice getDirectExchange() throws Exception
+ public PostOffice getQueuePostOffice() throws Exception
{
- return server.getDirectExchange();
+ return server.getQueuePostOffice();
}
- public PostOffice getTopicExchange() throws Exception
+ public PostOffice getTopicPostOffice() throws Exception
{
- return server.getTopicExchange();
+ return server.getTopicPostOffice();
}
public ObjectName getServerPeerObjectName() throws Exception
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RemoteTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RemoteTestServer.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RemoteTestServer.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -74,7 +74,7 @@
/**
* Only for in-VM use!
*/
- public PostOffice getDirectExchange() throws Exception
+ public PostOffice getQueuePostOffice() throws Exception
{
throw new UnsupportedOperationException("This method shouldn't be invoked on a remote server");
}
@@ -82,7 +82,7 @@
/**
* Only for in-VM use!
*/
- public PostOffice getTopicExchange() throws Exception
+ public PostOffice getTopicPostOffice() throws Exception
{
throw new UnsupportedOperationException("This method shouldn't be invoked on a remote server");
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java 2006-09-09 11:48:48 UTC (rev 1270)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java 2006-09-09 13:45:46 UTC (rev 1271)
@@ -121,9 +121,9 @@
/**
* Only for in-VM use!
*/
- PostOffice getDirectExchange() throws Exception;
+ PostOffice getQueuePostOffice() throws Exception;
- PostOffice getTopicExchange() throws Exception;
+ PostOffice getTopicPostOffice() throws Exception;
/**
* Only for in-VM use
More information about the jboss-cvs-commits
mailing list