[jboss-cvs] JBoss Messaging SVN: r3390 - in branches/Branch_Stable: src/etc/server/default/deploy and 12 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 30 12:55:48 EST 2007
Author: timfox
Date: 2007-11-30 12:55:48 -0500 (Fri, 30 Nov 2007)
New Revision: 3390
Removed:
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/SessionInternalEndpoint.java
branches/Branch_Stable/src/main/org/jboss/messaging/util/Reorderer.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/util/ReordererTest.java
Modified:
branches/Branch_Stable/src/etc/prepare-aop.xml
branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml
branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml
branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java
branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
branches/Branch_Stable/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
branches/Branch_Stable/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java
branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/JMSTest.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RemotingJMXWrapper.java
Log:
Mainly related to JBREM-845 and a few other bits and pieces
Modified: branches/Branch_Stable/src/etc/prepare-aop.xml
===================================================================
--- branches/Branch_Stable/src/etc/prepare-aop.xml 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/etc/prepare-aop.xml 2007-11-30 17:55:48 UTC (rev 3390)
@@ -18,7 +18,7 @@
<prepare expr="execution(* org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised->$implementing{org.jboss.jms.server.endpoint.ConnectionFactoryInternalEndpoint}(..))"/>
<prepare expr="execution(* org.jboss.jms.server.endpoint.advised.ConnectionAdvised->$implementing{org.jboss.jms.delegate.ConnectionEndpoint}(..))"/>
- <prepare expr="execution(* org.jboss.jms.server.endpoint.advised.SessionAdvised->$implementing{org.jboss.jms.server.endpoint.SessionInternalEndpoint}(..))"/>
+ <prepare expr="execution(* org.jboss.jms.server.endpoint.advised.SessionAdvised->$implementing{org.jboss.jms.delegate.SessionEndpoint}(..))"/>
<prepare expr="execution(* org.jboss.jms.server.endpoint.advised.ConsumerAdvised->$implementing{org.jboss.jms.delegate.ConsumerEndpoint}(..))"/>
<prepare expr="execution(* org.jboss.jms.server.endpoint.advised.BrowserAdvised->$implementing{org.jboss.jms.delegate.BrowserEndpoint}(..))"/>
Modified: branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml 2007-11-30 17:55:48 UTC (rev 3390)
@@ -69,7 +69,7 @@
UPDATE_MESSAGE_4CONDITIONAL=UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?
INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
- DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
+ DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
Modified: branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-11-30 17:55:48 UTC (rev 3390)
@@ -69,7 +69,7 @@
UPDATE_MESSAGE_4CONDITIONAL=UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?
INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
- DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
+ DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
Modified: branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml 2007-11-30 17:55:48 UTC (rev 3390)
@@ -73,7 +73,7 @@
UPDATE_MESSAGE_4CONDITIONAL=UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?
INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
- DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
+ DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
Modified: branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml 2007-11-30 17:55:48 UTC (rev 3390)
@@ -69,7 +69,7 @@
UPDATE_MESSAGE_4CONDITIONAL=UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?
INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
- DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
+ DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
Modified: branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml 2007-11-30 17:55:48 UTC (rev 3390)
@@ -74,7 +74,7 @@
UPDATE_MESSAGE_4CONDITIONAL=UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?
INSERT_MESSAGE_CONDITIONAL_FULL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) SELECT ?, ?, ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
MESSAGE_ID_COLUMN=MESSAGE_ID
- DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
+ DELETE_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)
INSERT_TRANSACTION=INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JBM_TX WHERE NODE_ID = ? AND TRANSACTION_ID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JBM_TX WHERE NODE_ID = ?
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -41,7 +41,6 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.util.Future;
-import org.jboss.messaging.util.Reorderer;
import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
@@ -219,8 +218,7 @@
private boolean handleFlowControl;
private long redeliveryDelay;
private volatile int currentToken;
- private Reorderer reorderer;
-
+
// Constructors ---------------------------------------------------------------------------------
public ClientConsumer(boolean isCC, int ackMode,
@@ -228,7 +226,7 @@
String queueName,
int bufferSize, QueuedExecutor sessionExecutor,
int maxDeliveries, boolean shouldAck, boolean handleFlowControl,
- long redeliveryDelay, Reorderer reorderer)
+ long redeliveryDelay)
{
if (bufferSize < 1)
{
@@ -250,14 +248,10 @@
this.shouldAck = shouldAck;
this.handleFlowControl = handleFlowControl;
this.redeliveryDelay = redeliveryDelay;
- this.reorderer = reorderer;
}
// Public ---------------------------------------------------------------------------------------
-
-
-
public boolean isClosed()
{
return closed;
@@ -272,11 +266,6 @@
{
ClientDelivery del = (ClientDelivery)message;
- //We need to make sure the deliveries are in the right order -
- //Due to the way remoting pooling works we cannot guarantee invocations will use the same TCP connection
- //Therefore they can be delivered out of order.
- //We therefore need to re-order them on arrival
-
Message msg = del.getMessage();
MessageProxy proxy = JBossMessage.
@@ -287,7 +276,7 @@
// a message delivery back to the same client which tries to ack but can't get through
// the valve. This won't be necessary when we move to a non blocking transport
- reorderer.handle(new HandleMessageRunnable(currentToken, proxy), del.getDeliveryId());
+ sessionExecutor.execute(new HandleMessageRunnable(currentToken, proxy));
}
public void setMessageListener(MessageListener listener) throws JMSException
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -36,7 +36,6 @@
import org.jboss.jms.exception.MessagingShutdownException;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.MessageQueueNameHelper;
-import org.jboss.messaging.util.Reorderer;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -84,7 +83,6 @@
String consumerID = consumerState.getConsumerID();
int prefetchSize = consumerState.getBufferSize();
QueuedExecutor sessionExecutor = sessionState.getExecutor();
- Reorderer reorderer = sessionState.getReorderer();
int maxDeliveries = consumerState.getMaxDeliveries();
long redeliveryDelay = consumerState.getRedeliveryDelay();
@@ -112,7 +110,7 @@
new ClientConsumer(isCC, sessionState.getAcknowledgeMode(),
sessionDelegate, consumerDelegate, consumerID, queueName,
prefetchSize, sessionExecutor, maxDeliveries, consumerState.isShouldAck(),
- autoFlowControl, redeliveryDelay, reorderer);
+ autoFlowControl, redeliveryDelay);
sessionState.addCallbackHandler(messageHandler);
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -129,6 +129,8 @@
client = conn.getRemotingClient();
+ onewayClient = conn.getOnewayClient();
+
strictTck = conn.isStrictTck();
}
@@ -140,6 +142,8 @@
client = conn.getRemotingClient();
+ onewayClient = conn.getOnewayClient();
+
strictTck = conn.isStrictTck();
}
@@ -451,31 +455,18 @@
}
public void send(JBossMessage m, boolean checkForDuplicates) throws JMSException
- {
- long seq;
-
- if (m.isReliable() || strictTck)
- {
- seq = -1;
- }
- else
- {
- SessionState sstate = (SessionState)state;
-
- seq = sstate.getNPSendSequence();
-
- sstate.incNpSendSequence();
- }
-
- RequestSupport req = new SessionSendRequest(id, version, m, checkForDuplicates, seq);
+ {
+ boolean oneway = !(m.isReliable() || strictTck);
+
+ RequestSupport req = new SessionSendRequest(id, version, m, checkForDuplicates, oneway);
- if (seq == -1)
- {
- doInvoke(client, req);
+ if (oneway)
+ {
+ doInvokeOneway(onewayClient, req);
}
else
{
- doInvokeOneway(client, req);
+ doInvoke(client, req);
}
}
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -83,6 +83,8 @@
protected transient byte version;
protected transient Client client;
+
+ protected transient Client onewayClient;
// Static ---------------------------------------------------------------------------------------
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -24,6 +24,7 @@
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import org.jboss.jms.server.ServerPeer;
@@ -126,16 +127,9 @@
metadata.put(Client.CALLBACK_SERVER_PORT, propertyPort);
}
- Map params = serverLocator.getParameters();
- int maxPoolSize = 50;
- if (params != null)
- {
- String val = (String)params.get(MicroSocketClientInvoker.MAX_POOL_SIZE_FLAG);
- maxPoolSize = Integer.valueOf((String)val).intValue();
- }
+ //Callback client pool MUST be size 1 so one way messages don't get out of order
- //Use the same value for the callback server
- metadata.put(MicroSocketClientInvoker.MAX_POOL_SIZE_FLAG, String.valueOf(maxPoolSize));
+ metadata.put(MicroSocketClientInvoker.MAX_POOL_SIZE_FLAG, "1");
String protocol = serverLocator.getProtocol();
if ("bisocket".equals(protocol) || "sslbisocket".equals(protocol))
@@ -246,6 +240,7 @@
// Attributes -----------------------------------------------------------------------------------
private Client client;
+ private Client onewayClient;
private boolean clientPing;
private InvokerLocator serverLocator;
private CallbackManager callbackManager;
@@ -282,7 +277,35 @@
client = new Client(serverLocator, config);
client.setSubsystem(ServerPeer.REMOTING_JMS_SUBSYSTEM);
-
+
+ config.putAll(serverLocator.getParameters());
+ config.put(Client.ENABLE_LEASE, "false");
+ config.put(MicroSocketClientInvoker.MAX_POOL_SIZE_FLAG, "1");
+
+ //Hack - need to change the timeout to make sure a different pool is used
+ config.put("timeout", String.valueOf(Integer.MAX_VALUE));
+
+ StringBuffer buff = new StringBuffer();
+ buff.append(serverLocator.getProtocol()).append("://");
+ buff.append(serverLocator.getHost()).append(":").append(serverLocator.getPort());
+ buff.append("/?");
+ Iterator iter = config.entrySet().iterator();
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+ String key = (String)entry.getKey();
+ String val = (String)entry.getValue();
+ buff.append(key).append("=").append(val);
+ if (iter.hasNext())
+ {
+ buff.append("&");
+ }
+ }
+
+ onewayClient = new Client(new InvokerLocator(buff.toString()), config);
+
+ onewayClient.setSubsystem(ServerPeer.REMOTING_JMS_SUBSYSTEM);
+
if (log.isTraceEnabled()) { log.trace(this + " created client"); }
callbackManager = new CallbackManager();
@@ -293,6 +316,7 @@
public Object run() throws Exception
{
client.connect();
+ onewayClient.connect();
return null;
}
});
@@ -304,7 +328,10 @@
client.setMarshaller(new JMSWireFormat());
client.setUnMarshaller(new JMSWireFormat());
-
+
+ onewayClient.setMarshaller(new JMSWireFormat());
+ onewayClient.setUnMarshaller(new JMSWireFormat());
+
Map metadata = new HashMap();
metadata.put(InvokerLocator.DATATYPE, "jms");
@@ -345,9 +372,19 @@
{
log.trace(this + " failed to disconnect the client", ignore);
}
+
+ try
+ {
+ onewayClient.disconnect();
+ }
+ catch (Throwable ignore)
+ {
+ log.trace(this + " failed to disconnect the client", ignore);
+ }
+
+ client = null;
+ onewayClient = null;
- client = null;
-
log.trace(this + " closed");
}
@@ -355,19 +392,23 @@
{
return client;
}
-
+
+ public Client getOnewayClient()
+ {
+ return onewayClient;
+ }
+
public CallbackManager getCallbackManager()
{
return callbackManager;
}
-
public boolean isStrictTck()
{
return strictTck;
}
- public synchronized boolean isFailed()
+ public synchronized boolean isFailed()
{
return failed;
}
@@ -394,6 +435,15 @@
log.trace(this + " failed to set disconnect timeout", ignore);
}
+ try
+ {
+ onewayClient.setDisconnectTimeout(0);
+ }
+ catch (Throwable ignore)
+ {
+ log.trace(this + " failed to set disconnect timeout", ignore);
+ }
+
stop();
}
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/state/SessionState.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -47,7 +47,6 @@
import org.jboss.jms.tx.ResourceManager;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.ClearableQueuedExecutor;
-import org.jboss.messaging.util.Reorderer;
import org.jboss.messaging.util.Version;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
@@ -112,8 +111,6 @@
private long npSendSequence;
- private Reorderer reorderer;
-
// Constructors ---------------------------------------------------------------------------------
public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
@@ -153,8 +150,6 @@
// TODO could optimise this to use the same map of callbackmanagers (which holds refs
// to callbackhandlers) in the connection, instead of maintaining another map
callbackHandlers = new HashMap();
-
- this.reorderer = new ReceiveReorderer();
}
// HierarchicalState implementation -------------------------------------------------------------
@@ -339,7 +334,6 @@
}
List recoveryInfos = new ArrayList();
- long maxDeliveryID = 0;
if (!ackInfos.isEmpty())
{
for (Iterator i = ackInfos.iterator(); i.hasNext(); )
@@ -351,7 +345,6 @@
del.getQueueName());
recoveryInfos.add(recInfo);
- maxDeliveryID = Math.max(maxDeliveryID, del.getMessageProxy().getDeliveryId());
}
}
@@ -364,13 +357,7 @@
//like remove from recovery Area refs corresponding to messages in client consumer buffers
newDelegate.recoverDeliveries(recoveryInfos, oldSessionID);
-
- reorderer.reset(maxDeliveryID + 1);
}
- else
- {
- reorderer.reset(maxDeliveryID);
- }
}
// Public ---------------------------------------------------------------------------------------
@@ -488,11 +475,6 @@
npSendSequence++;
}
- public Reorderer getReorderer()
- {
- return reorderer;
- }
-
public String toString()
{
return "SessionState[" + sessionID + "]";
@@ -506,13 +488,5 @@
// Inner classes --------------------------------------------------------------------------------
- private class ReceiveReorderer extends Reorderer
- {
- public void execute(Object object) throws Exception
- {
- executor.execute((Runnable)object);
- }
- }
-
}
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -86,15 +86,12 @@
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.messaging.util.GUIDGenerator;
import org.jboss.messaging.util.MessageQueueNameHelper;
-import org.jboss.messaging.util.Reorderer;
-import org.jboss.remoting.Client;
import org.jboss.remoting.callback.Callback;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
/**
* The server side representation of a JMS session.
@@ -174,8 +171,6 @@
private LinkedQueue toDeliver = new LinkedQueue();
- private Reorderer reorderer = new SendReorderer();
-
private boolean waitingToClose = false;
// Constructors ---------------------------------------------------------------------------------
@@ -337,33 +332,14 @@
{
if (trace) log.trace(this + " closing");
- // Wait for last np message to arrive
-
- if (sequence != 0)
- {
- reorderer.waitToArrive(sequence);
- }
-
return -1;
}
- public void send(JBossMessage message, boolean checkForDuplicates) throws JMSException
+ public void send(JBossMessage message, final boolean checkForDuplicates) throws JMSException
{
- throw new IllegalStateException("Should not be handled on the server");
- }
-
- public void send(JBossMessage message, final boolean checkForDuplicates, long thisSequence) throws JMSException
- {
try
{
- if (thisSequence != -1)
- {
- reorderer.handle(message, thisSequence);
- }
- else
- {
- connectionEndpoint.sendMessage(message, null, checkForDuplicates);
- }
+ connectionEndpoint.sendMessage(message, null, checkForDuplicates);
}
catch (Throwable t)
{
@@ -590,6 +566,8 @@
}
deliveryIdSequence = maxDeliveryId + 1;
+
+ // log.info("*** set deliveryIdSequence to " + deliveryIdSequence);
}
catch (Throwable t)
{
@@ -1269,14 +1247,14 @@
//out of order! There maybe some better way of doing this
synchronized void handleDelivery(Delivery delivery, ServerConsumerEndpoint consumer) throws Exception
{
- long deliveryId = -1;
-
if (trace) { log.trace(this + " handling delivery " + delivery); }
DeliveryRecord rec = null;
- deliveryId = deliveryIdSequence++;
+ long deliveryId = deliveryIdSequence++;
+ // log.info(System.identityHashCode(this) + " Delivery id is now " + deliveryId);
+
if (trace) { log.trace("Delivery id is now " + deliveryId); }
//TODO can't we combine flags isRetainDeliveries and isReplicating - surely they're mutually exclusive?
@@ -1395,25 +1373,19 @@
// We send the message to the client on the current thread. The message is written onto the
// transport and then the thread returns immediately without waiting for a response.
- Client callbackClient = callbackHandler.getCallbackClient();
-
ClientDelivery del = new ClientDelivery(ref.getMessage(), consumer.getID(), deliveryID, ref.getDeliveryCount());
Callback callback = new Callback(del);
-
+
try
{
- //Note - even though remoting cannot guarantee ordering of one way invocations unless client pool size =1
- //in which case performance would be crippled - so this is not an option.
- //We use a reorderer on the client side to re-order deliveries that may have got out of order
-
// one way invocation, no acknowledgment sent back by the client
if (trace) { log.trace(this + " submitting message " + ref.getMessage() + " to the remoting layer to be sent asynchronously"); }
-
- callbackHandler.handleCallbackOneway(callback);
+
+ callbackHandler.handleCallbackOneway(callback);
//We store the delivery id so we know to wait for any deliveries in transit on close
- consumer.setLastDeliveryID(deliveryID);
+ consumer.setLastDeliveryID(deliveryID);
}
catch (Throwable t)
{
@@ -2335,11 +2307,4 @@
}
}
- private class SendReorderer extends Reorderer
- {
- public void execute(Object object) throws Exception
- {
- connectionEndpoint.sendMessage((JBossMessage)object, null, false);
- }
- }
}
Deleted: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/SessionInternalEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/SessionInternalEndpoint.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/SessionInternalEndpoint.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.server.endpoint;
-
-import javax.jms.JMSException;
-
-import org.jboss.jms.delegate.SessionEndpoint;
-import org.jboss.jms.message.JBossMessage;
-
-/**
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: $</tt>5 Oct 2007
- *
- * $Id: $
- *
- */
-public interface SessionInternalEndpoint extends SessionEndpoint
-{
- void send(JBossMessage msg, boolean checkForDuplicates, long seq) throws JMSException;
-}
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -35,7 +35,6 @@
import org.jboss.jms.destination.JBossTopic;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
-import org.jboss.jms.server.endpoint.SessionInternalEndpoint;
/**
* The server-side advised instance corresponding to a Session. It is bound to the AOP
@@ -48,7 +47,7 @@
*
* $Id$
*/
-public class SessionAdvised extends AdvisedSupport implements SessionInternalEndpoint
+public class SessionAdvised extends AdvisedSupport implements SessionEndpoint
{
// Constants -----------------------------------------------------
@@ -79,14 +78,9 @@
public void send(JBossMessage msg, boolean checkForDuplicates) throws JMSException
{
- throw new IllegalStateException("Invocation should not be handle here");
+ ((ServerSessionEndpoint)endpoint).send(msg, checkForDuplicates);
}
- public void send(JBossMessage msg, boolean checkForDuplicates, long seq) throws JMSException
- {
- ((ServerSessionEndpoint)endpoint).send(msg, checkForDuplicates, seq);
- }
-
public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
boolean connectionConsumer, boolean autoFlowControl) throws JMSException
Modified: branches/Branch_Stable/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/wireformat/ConsumerChangeRateRequest.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -66,13 +66,11 @@
ConsumerEndpoint endpoint =
(ConsumerEndpoint)Dispatcher.instance.getTarget(objectId);
- if (endpoint == null)
+ if (endpoint != null)
{
- throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
+ endpoint.changeRate(newRate);
}
- endpoint.changeRate(newRate);
-
return null;
}
@@ -83,22 +81,7 @@
os.writeFloat(newRate);
os.flush();
- }
-
- // Until we have NBIO transport this needs to be synchronous otherwise we can get out of sync
- // due to earlier invocations overtaking later invocations.
-
-// public Object getPayload()
-// {
-// OnewayInvocation oi = new OnewayInvocation(this);
-//
-// InvocationRequest request =
-// new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
-// oi, ONE_WAY_METADATA, null, null);
-//
-// return request;
-// }
-
+ }
}
Modified: branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -47,8 +47,8 @@
private JBossMessage msg;
private boolean checkForDuplicates;
- private long sequence;
-
+ private boolean oneway;
+
public SessionSendRequest()
{
}
@@ -57,12 +57,12 @@
byte version,
JBossMessage msg,
boolean checkForDuplicates,
- long sequence)
+ boolean oneway)
{
super(objectId, PacketSupport.REQ_SESSION_SEND, version);
this.msg = msg;
this.checkForDuplicates = checkForDuplicates;
- this.sequence = sequence;
+ this.oneway = oneway;
}
public void read(DataInputStream is) throws Exception
@@ -77,7 +77,7 @@
checkForDuplicates = is.readBoolean();
- sequence = is.readLong();
+ oneway = is.readBoolean();
}
public ResponseSupport serverInvoke() throws Exception
@@ -87,21 +87,14 @@
if (advised != null)
{
- advised.send(msg, checkForDuplicates, sequence);
+ advised.send(msg, checkForDuplicates);
}
else
{
- if (sequence == -1)
- {
- //Persistent message
-
- throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
- }
- else
- {
- // Since NP messages are sent one way, there is a possibility the session has closed
- //by the time the message arrives, so we ignore this
- }
+ if (!oneway)
+ {
+ throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
+ }
}
return null;
@@ -117,14 +110,14 @@
os.writeBoolean(checkForDuplicates);
- os.writeLong(sequence);
+ os.writeBoolean(oneway);
os.flush();
}
public Object getPayload()
{
- if (sequence != -1)
+ if (oneway)
{
OnewayInvocation oi = new OnewayInvocation(this);
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -2367,7 +2367,7 @@
map.put("CREATE_IDX_MESSAGE_REF_MESSAGE_ID",
"CREATE INDEX JBM_MSG_REF_MESSAGE_ID ON JBM_MSG_REF (MESSAGE_ID)");
map.put("CREATE_IDX_MESSAGE_REF_SCHED_DELIVERY",
- "CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)");
+ "CREATE INDEX JBM_MSG_REF_SCHED_DELIVERY ON JBM_MSG_REF (SCHED_DELIVERY)");
// Message
map.put("CREATE_MESSAGE",
"CREATE TABLE JBM_MSG (MESSAGE_ID BIGINT, RELIABLE CHAR(1), "
@@ -2450,7 +2450,7 @@
"UPDATE JBM_MSG SET HEADERS=?, PAYLOAD=? WHERE MESSAGE_ID=?");
map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
map.put("DELETE_MESSAGE",
- "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT * FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
+ "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
// Transaction
map.put("INSERT_TRANSACTION",
"INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) "
Deleted: branches/Branch_Stable/src/main/org/jboss/messaging/util/Reorderer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/util/Reorderer.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/util/Reorderer.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -1,107 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.util;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.jboss.logging.Logger;
-
-/**
- *
- * A Reorderer
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public abstract class Reorderer
-{
- private static final Logger log = Logger.getLogger(Reorderer.class);
-
- private static final long WAIT_TIMEOUT = 5 * 1000;
-
- private long expectedSequence = 0;
-
- private Map<Long, Object> heldBack = new HashMap<Long, Object>();
-
- public synchronized void reset(long expectedSequence)
- {
- heldBack.clear();
-
- this.expectedSequence = expectedSequence;
- }
-
- public synchronized void handle(Object object, long thisSequence) throws Exception
- {
- //Need to make sure it is in correct order since np messages are sent
- //one way so they can arrive out of sequence
-
- //This is a workaround to allow us to use one way messages for np messages for performance
- //reasons
-
- if (thisSequence == expectedSequence)
- {
- do
- {
- execute(object);
-
- expectedSequence++;
-
- object = heldBack.remove(expectedSequence);
-
- } while (object != null);
- }
- else
- {
- //Not the expected one - add it to the map
-
- heldBack.put(thisSequence, object);
- }
-
- notify();
- }
-
- public synchronized void waitToArrive(long sequence)
- {
- long wait = WAIT_TIMEOUT;
-
- while (sequence != expectedSequence && wait > 0)
- {
- long start = System.currentTimeMillis();
- try
- {
- wait();
- }
- catch (InterruptedException e)
- {
- }
- wait -= (System.currentTimeMillis() - start);
- }
-
- if (wait <= 0)
- {
- log.warn("Timed out waiting for last message");
- }
- }
-
- public abstract void execute(Object object) throws Exception;
-}
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/JMSTest.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/JMSTest.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -82,6 +82,10 @@
TextMessage m = session.createTextMessage("message one");
prod.send(m);
+
+ // It's np so give it some time to hit the server before closing
+
+ Thread.sleep(2000);
conn.close();
@@ -124,6 +128,10 @@
m.setText("message one");
prod.send(m);
+
+ //It's np so give it some time to hit the server before closing
+
+ Thread.sleep(2000);
conn.close();
@@ -164,6 +172,11 @@
TextMessage m = session.createTextMessage("message one");
prod.send(m);
+
+ // It's np so give it some time to hit the server before closing
+
+ Thread.sleep(2000);
+
conn.close();
@@ -291,6 +304,11 @@
prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage m = session.createTextMessage("one");
prod.send(m);
+
+ //It's np so give it some time to hit the server before closing
+
+ Thread.sleep(2000);
+
conn.close();
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -667,7 +667,7 @@
JBossMessage msg = new JBossMessage(123);
RequestSupport req =
- new SessionSendRequest("23", (byte)77, msg, false, 123);
+ new SessionSendRequest("23", (byte)77, msg, false, true);
testPacket(req, PacketSupport.REQ_SESSION_SEND);
}
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RemotingJMXWrapper.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RemotingJMXWrapper.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/tools/container/RemotingJMXWrapper.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -70,9 +70,14 @@
{
return;
}
-
- connector = new Connector();
- connector.setInvokerLocator(locator.getLocatorURI());
+
+ Map config = new HashMap();
+
+ config.put("onewayThreadPool", "org.jboss.jms.server.remoting.DirectThreadPool");
+
+ connector = new Connector(locator, config);
+
+ connector.create();
connector.start();
}
Deleted: branches/Branch_Stable/tests/src/org/jboss/test/messaging/util/ReordererTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/util/ReordererTest.java 2007-11-30 17:46:34 UTC (rev 3389)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/util/ReordererTest.java 2007-11-30 17:55:48 UTC (rev 3390)
@@ -1,188 +0,0 @@
-/**
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.test.messaging.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.jboss.messaging.util.Reorderer;
-import org.jboss.test.messaging.MessagingTestCase;
-
-/**
- *
- * A ReordererTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class ReordererTest extends MessagingTestCase
-{
- // Constants ------------------------------------------------------------------------------------
-
- // Static ---------------------------------------------------------------------------------------
-
- // Attributes -----------------------------------------------------------------------------------
-
- // Constructors ---------------------------------------------------------------------------------
-
- public ReordererTest(String name)
- {
- super(name);
- }
-
- // Public ---------------------------------------------------------------------------------------
-
- public void testOrder() throws Exception
- {
- final List<Integer> list = new ArrayList<Integer>();
-
- class MyReorderer extends Reorderer
- {
- @Override
- public void execute(Object object) throws Exception
- {
- list.add((Integer)object);
- }
- }
-
- Reorderer reorderer = new MyReorderer();
-
- reorderer.handle(3, 3);
- reorderer.handle(1, 1);
- reorderer.handle(9, 9);
- reorderer.handle(6, 6);
- reorderer.handle(0, 0);
- reorderer.handle(2, 2);
- reorderer.handle(5, 5);
- reorderer.handle(8, 8);
- reorderer.handle(7, 7);
- reorderer.handle(4, 4);
-
- for (int i = 0; i < 10; i++)
- {
- assertEquals(Integer.valueOf(i), (Integer)list.get(i));
- }
-
- }
-
- public void testHoldBack() throws Exception
- {
- final List<Integer> list = new ArrayList<Integer>();
-
- class MyReorderer extends Reorderer
- {
- @Override
- public void execute(Object object) throws Exception
- {
- list.add((Integer)object);
- }
- }
-
- Reorderer reorderer = new MyReorderer();
-
- reorderer.handle(3, 3);
- assertTrue(list.isEmpty());
- reorderer.handle(1, 1);
- assertTrue(list.isEmpty());
- reorderer.handle(9, 9);
- assertTrue(list.isEmpty());
- reorderer.handle(6, 6);
- assertTrue(list.isEmpty());
- reorderer.handle(0, 0);
- assertEquals(Integer.valueOf(0), (Integer)list.get(0));
- assertEquals(Integer.valueOf(1), (Integer)list.get(1));
- assertEquals(2, list.size());
- reorderer.handle(2, 2);
- assertEquals(Integer.valueOf(0), (Integer)list.get(0));
- assertEquals(Integer.valueOf(1), (Integer)list.get(1));
- assertEquals(Integer.valueOf(2), (Integer)list.get(2));
- assertEquals(Integer.valueOf(3), (Integer)list.get(3));
- assertEquals(4, list.size());
- reorderer.handle(5, 5);
- assertEquals(Integer.valueOf(0), (Integer)list.get(0));
- assertEquals(Integer.valueOf(1), (Integer)list.get(1));
- assertEquals(Integer.valueOf(2), (Integer)list.get(2));
- assertEquals(Integer.valueOf(3), (Integer)list.get(3));
- assertEquals(4, list.size());
- reorderer.handle(4, 4);
- assertEquals(Integer.valueOf(0), (Integer)list.get(0));
- assertEquals(Integer.valueOf(1), (Integer)list.get(1));
- assertEquals(Integer.valueOf(2), (Integer)list.get(2));
- assertEquals(Integer.valueOf(3), (Integer)list.get(3));
- assertEquals(Integer.valueOf(4), (Integer)list.get(4));
- assertEquals(Integer.valueOf(5), (Integer)list.get(5));
- assertEquals(Integer.valueOf(6), (Integer)list.get(6));
- assertEquals(7, list.size());
- reorderer.handle(8, 8);
- assertEquals(Integer.valueOf(0), (Integer)list.get(0));
- assertEquals(Integer.valueOf(1), (Integer)list.get(1));
- assertEquals(Integer.valueOf(2), (Integer)list.get(2));
- assertEquals(Integer.valueOf(3), (Integer)list.get(3));
- assertEquals(Integer.valueOf(4), (Integer)list.get(4));
- assertEquals(Integer.valueOf(5), (Integer)list.get(5));
- assertEquals(Integer.valueOf(6), (Integer)list.get(6));
- assertEquals(7, list.size());
- reorderer.handle(7, 7);
- assertEquals(Integer.valueOf(0), (Integer)list.get(0));
- assertEquals(Integer.valueOf(1), (Integer)list.get(1));
- assertEquals(Integer.valueOf(2), (Integer)list.get(2));
- assertEquals(Integer.valueOf(3), (Integer)list.get(3));
- assertEquals(Integer.valueOf(4), (Integer)list.get(4));
- assertEquals(Integer.valueOf(5), (Integer)list.get(5));
- assertEquals(Integer.valueOf(6), (Integer)list.get(6));
- assertEquals(Integer.valueOf(7), (Integer)list.get(7));
- assertEquals(Integer.valueOf(8), (Integer)list.get(8));
- assertEquals(Integer.valueOf(9), (Integer)list.get(9));
- assertEquals(10, list.size());
-
-
- for (int i = 0; i < 10; i++)
- {
- assertEquals(Integer.valueOf(i), (Integer)list.get(i));
- }
-
- }
-
- public void testOrderMustStartWithZero() throws Exception
- {
- final List<Integer> list = new ArrayList<Integer>();
-
- class MyReorderer extends Reorderer
- {
- @Override
- public void execute(Object object) throws Exception
- {
- list.add((Integer)object);
- }
- }
-
- Reorderer reorderer = new MyReorderer();
-
- reorderer.handle(3, 3);
- reorderer.handle(1, 1);
- reorderer.handle(9, 9);
- reorderer.handle(6, 6);
- reorderer.handle(2, 2);
- reorderer.handle(5, 5);
- reorderer.handle(8, 8);
- reorderer.handle(7, 7);
- reorderer.handle(4, 4);
-
- assertTrue(list.isEmpty());
-
- }
-
- // Package protected ----------------------------------------------------------------------------
-
- // Protected ------------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- // Inner classes --------------------------------------------------------------------------------
-
-
-}
More information about the jboss-cvs-commits
mailing list