[jboss-cvs] JBoss Messaging SVN: r3617 - in trunk/src/main/org/jboss/jms: client/api and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jan 23 04:40:25 EST 2008
Author: timfox
Date: 2008-01-23 04:40:25 -0500 (Wed, 23 Jan 2008)
New Revision: 3617
Modified:
trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java
trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
trunk/src/main/org/jboss/jms/client/JBossSession.java
trunk/src/main/org/jboss/jms/client/MessageHandler.java
trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java
trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/jms/client/impl/DeliveryInfo.java
trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
trunk/src/main/org/jboss/jms/tx/ResourceManager.java
Log:
ack / tx changes part I
Modified: trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/AsfMessageHolder.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -43,18 +43,15 @@
private ClientSession connectionConsumerSession;
- private boolean shouldAck;
-
public AsfMessageHolder(JBossMessage msg, String consumerID,
String queueName, int maxDeliveries,
- ClientSession connectionConsumerSession, boolean shouldAck)
+ ClientSession connectionConsumerSession)
{
this.msg = msg;
this.consumerID = consumerID;
this.queueName = queueName;
this.maxDeliveries = maxDeliveries;
this.connectionConsumerSession = connectionConsumerSession;
- this.shouldAck = shouldAck;
}
public JBossMessage getMsg()
@@ -106,16 +103,4 @@
{
this.connectionConsumerSession = connectionConsumerSession;
}
-
- public boolean isShouldAck()
- {
- return shouldAck;
- }
-
- public void setShouldAck(boolean shouldAck)
- {
- this.shouldAck = shouldAck;
- }
-
-
}
Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -95,8 +95,6 @@
private String queueName;
- private boolean shouldAck;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -122,9 +120,7 @@
this.consumerID = cons.getID();
this.maxDeliveries = cons.getMaxDeliveries();
-
- shouldAck = cons.isShouldAck();
-
+
if (subName != null)
{
queueName = MessageQueueNameHelper.createSubscriptionName(conn.getClientID(), subName);
@@ -280,7 +276,7 @@
for (int i = 0; i < mesList.size(); i++)
{
JBossMessage m = (JBossMessage)mesList.get(i);
- session.addAsfMessage(m, consumerID, queueName, maxDeliveries, sess, shouldAck);
+ session.addAsfMessage(m, consumerID, queueName, maxDeliveries, sess);
if (trace) { log.trace("added " + m + " to session"); }
}
Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -224,7 +224,7 @@
MessageHandler.callOnMessage(session, distinguishedListener, holder.getConsumerID(),
false,
holder.getMsg(), ackMode, holder.getMaxDeliveries(),
- holder.getConnectionConsumerSession(), holder.isShouldAck());
+ holder.getConnectionConsumerSession());
}
}
}
@@ -497,12 +497,12 @@
* with messages to be processed by the session's run() method
*/
void addAsfMessage(JBossMessage m, String consumerID, String queueName, int maxDeliveries,
- ClientSession connectionConsumerSession, boolean shouldAck) throws JMSException
+ ClientSession connectionConsumerSession) throws JMSException
{
AsfMessageHolder holder =
new AsfMessageHolder(m, consumerID, queueName, maxDeliveries,
- connectionConsumerSession, shouldAck);
+ connectionConsumerSession);
if (asfMessages == null)
{
Modified: trunk/src/main/org/jboss/jms/client/MessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/MessageHandler.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/MessageHandler.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -54,18 +54,17 @@
JBossMessage m,
int ackMode,
int maxDeliveries,
- ClientSession connectionConsumerSession,
- boolean shouldAck)
+ ClientSession connectionConsumerSession)
throws JMSException
{
- if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
+ if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries))
{
// Message has been cancelled
return;
}
DeliveryInfo deliveryInfo =
- new DeliveryInfo(m, consumerID, connectionConsumerSession, shouldAck);
+ new DeliveryInfo(m, consumerID, connectionConsumerSession);
m.incDeliveryCount();
@@ -114,7 +113,7 @@
public static boolean checkExpiredOrReachedMaxdeliveries(JBossMessage jbm,
ClientSession del,
- int maxDeliveries, boolean shouldCancel)
+ int maxDeliveries)
{
Message msg = jbm.getCoreMessage();
@@ -135,21 +134,18 @@
log.trace(msg + " has reached maximum delivery number " + maxDeliveries +", cancelling to server");
}
}
-
- if (shouldCancel)
- {
- final Cancel cancel = new CancelImpl(jbm.getDeliveryId(), jbm.getDeliveryCount(),
- expired, reachedMaxDeliveries);
- try
- {
- del.cancelDelivery(cancel);
- }
- catch (JMSException e)
- {
- log.error("Failed to cancel delivery", e);
- }
+
+ final Cancel cancel = new CancelImpl(jbm.getDeliveryId(), jbm.getDeliveryCount(),
+ expired, reachedMaxDeliveries);
+ try
+ {
+ del.cancelDelivery(cancel);
}
-
+ catch (JMSException e)
+ {
+ log.error("Failed to cancel delivery", e);
+ }
+
return true;
}
else
Modified: trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/api/ClientConsumer.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -40,8 +40,6 @@
int getMaxDeliveries();
- boolean isShouldAck();
-
void handleMessage(JBossMessage message) throws Exception;
void addToFrontOfBuffer(JBossMessage message) throws JMSException;
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -89,8 +89,7 @@
private QueuedExecutor sessionExecutor;
private boolean listenerRunning;
private long lastDeliveryId = -1;
- private boolean waitingForLastDelivery;
- private boolean shouldAck;
+ private boolean waitingForLastDelivery;
private int consumeCount;
private MessagingRemotingConnection remotingConnection;
@@ -105,8 +104,7 @@
Destination dest,
String selector, boolean noLocal,
boolean isCC, QueuedExecutor sessionExecutor,
- MessagingRemotingConnection remotingConnection,
- boolean shouldAck)
+ MessagingRemotingConnection remotingConnection)
{
this.id = id;
this.session = session;
@@ -118,7 +116,6 @@
this.noLocal = noLocal;
this.isConnectionConsumer = isCC;
this.sessionExecutor = sessionExecutor;
- this.shouldAck = shouldAck;
this.remotingConnection = remotingConnection;
}
@@ -329,11 +326,11 @@
if (trace) { log.trace(this + " received " + m + " after being blocked on buffer"); }
boolean ignore =
- MessageHandler.checkExpiredOrReachedMaxdeliveries(m, session, maxDeliveries, shouldAck);
+ MessageHandler.checkExpiredOrReachedMaxdeliveries(m, session, maxDeliveries);
if (!isConnectionConsumer && !ignore)
{
- DeliveryInfo info = new DeliveryInfo(m, id, null, shouldAck);
+ DeliveryInfo info = new DeliveryInfo(m, id, null);
session.preDeliver(info);
@@ -405,11 +402,6 @@
}
}
- public boolean isShouldAck()
- {
- return this.shouldAck;
- }
-
public void handleMessage(final JBossMessage message) throws Exception
{
synchronized (mainLock)
@@ -499,7 +491,7 @@
// consumer's deliveries until then), which is too late - since we need to preserve the
// order of messages delivered in a session.
- if (shouldAck && !buffer.isEmpty())
+ if (!buffer.isEmpty())
{
// Now we cancel any deliveries that might be waiting in our buffer. This is because
// otherwise the messages wouldn't get cancelled until the corresponding session died.
@@ -836,7 +828,7 @@
try
{
MessageHandler.callOnMessage(session, theListener, id,
- false, msg, session.getAcknowledgeMode(), maxDeliveries, null, shouldAck);
+ false, msg, session.getAcknowledgeMode(), maxDeliveries, null);
if (trace) { log.trace("Called callonMessage"); }
}
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -423,14 +423,12 @@
CreateConsumerResponse response = (CreateConsumerResponse)remotingConnection.sendBlocking(id, request);
- boolean shouldAck = !(destination.getType() == DestinationType.TOPIC && subscriptionName == null);
-
ClientConsumer consumer =
new ClientConsumerImpl(this, response.getConsumerID(), response.getBufferSize(),
response.getMaxDeliveries(), response.getRedeliveryDelay(),
destination,
selector, noLocal,
- isCC, executor, remotingConnection, shouldAck);
+ isCC, executor, remotingConnection);
children.put(response.getConsumerID(), consumer);
@@ -1010,37 +1008,27 @@
private boolean ackDelivery(DeliveryInfo delivery) throws JMSException
{
- if (delivery.isShouldAck())
- {
- ClientSession connectionConsumerSession = delivery.getConnectionConsumerSession();
+ ClientSession connectionConsumerSession = delivery.getConnectionConsumerSession();
- //If the delivery was obtained via a connection consumer we need to ack via that
- //otherwise we just use this session
+ //If the delivery was obtained via a connection consumer we need to ack via that
+ //otherwise we just use this session
- ClientSession sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : this;
+ ClientSession sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : this;
- return sessionToUse.acknowledgeDelivery(delivery);
- }
- else
- {
- return true;
- }
+ return sessionToUse.acknowledgeDelivery(delivery);
}
private void cancelDelivery(DeliveryInfo delivery) throws JMSException
{
- if (delivery.isShouldAck())
- {
- ClientSession connectionConsumerSession = delivery.getConnectionConsumerSession();
+ ClientSession connectionConsumerSession = delivery.getConnectionConsumerSession();
- //If the delivery was obtained via a connection consumer we need to cancel via that
- //otherwise we just use this session
+ //If the delivery was obtained via a connection consumer we need to cancel via that
+ //otherwise we just use this session
- ClientSession sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : this;
+ ClientSession sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : this;
- sessionToUse.cancelDelivery(new CancelImpl(delivery.getDeliveryID(),
- delivery.getMessage().getDeliveryCount(), false, false));
- }
+ sessionToUse.cancelDelivery(new CancelImpl(delivery.getDeliveryID(),
+ delivery.getMessage().getDeliveryCount(), false, false));
}
private void internalCancelDeliveries( List deliveryInfos) throws JMSException
@@ -1051,14 +1039,11 @@
{
DeliveryInfo ack = (DeliveryInfo)i.next();
- if (ack.isShouldAck())
- {
- CancelImpl cancel = new CancelImpl(ack.getMessage().getDeliveryId(),
- ack.getMessage().getDeliveryCount(),
- false, false);
+ CancelImpl cancel = new CancelImpl(ack.getMessage().getDeliveryId(),
+ ack.getMessage().getDeliveryCount(),
+ false, false);
- cancels.add(cancel);
- }
+ cancels.add(cancel);
}
if (!cancels.isEmpty())
@@ -1069,22 +1054,10 @@
private void acknowledgeDeliveries(ClientSession del, List deliveryInfos) throws JMSException
{
- List acks = new ArrayList();
-
- for (Iterator i = deliveryInfos.iterator(); i.hasNext(); )
+ if (!deliveryInfos.isEmpty())
{
- DeliveryInfo ack = (DeliveryInfo)i.next();
-
- if (ack.isShouldAck())
- {
- acks.add(ack);
- }
+ del.acknowledgeDeliveries(deliveryInfos);
}
-
- if (!acks.isEmpty())
- {
- del.acknowledgeDeliveries(acks);
- }
}
// Inner Classes --------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/impl/DeliveryInfo.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/DeliveryInfo.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/client/impl/DeliveryInfo.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -45,9 +45,6 @@
private JBossMessage msg;
- //For messages in non durable subscriptions - there is no need to ack on the server
- private boolean shouldAck;
-
//When using the evil abomination known as a ConnectionConsumer, the connection consumer
//will get from a session that it created, then pass them onto sessions got from the pool
//this means when the messages are acked/cancelled then this needs to be done against
@@ -62,15 +59,13 @@
// Constructors --------------------------------------------------
public DeliveryInfo(JBossMessage msg, String consumerId,
- ClientSession connectionConsumerSession, boolean shouldAck)
+ ClientSession connectionConsumerSession)
{
this.msg = msg;
this.consumerId = consumerId;
this.connectionConsumerSession = connectionConsumerSession;
-
- this.shouldAck = shouldAck;
}
// Public --------------------------------------------------------
@@ -90,11 +85,6 @@
return connectionConsumerSession;
}
- public boolean isShouldAck()
- {
- return shouldAck;
- }
-
public String toString()
{
return "Delivery[" + getDeliveryID() + ", " + msg + "]";
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConnectionEndpoint.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -52,8 +52,6 @@
void stop() throws JMSException;
- void sendTransaction(TransactionRequest request) throws JMSException;
-
- MessagingXid[] getPreparedTransactions() throws JMSException; ;
+ void sendTransaction(TransactionRequest request) throws JMSException;
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -202,14 +202,14 @@
public CreateSessionResponse createSession(boolean transacted,
int acknowledgmentMode,
- boolean isXA)
+ boolean xa)
throws JMSException
{
try
{
log.trace(this + " creating " + (transacted ? "transacted" : "non transacted") +
" session, " + Util.acknowledgmentMode(acknowledgmentMode) + ", " +
- (isXA ? "XA": "non XA"));
+ (xa ? "XA": "non XA"));
if (closed)
{
@@ -222,7 +222,7 @@
// connection endpoint instance
//Note we only replicate transacted and client acknowledge sessions.
- ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this);
+ ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this, transacted, xa);
synchronized (sessions)
{
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -109,12 +109,8 @@
// Must be volatile
private volatile boolean clientAccepting;
- private boolean retainDeliveries;
-
private long lastDeliveryID = -1;
- private volatile boolean dead;
-
private int prefetchSize;
private volatile int sendCount;
@@ -163,19 +159,6 @@
this.filter = filter;
- //FIXME - we shouldn't have checks like this on the server side
- //It should be the jms client that decides whether to retain deliveries or not
- if (destination.getType() == DestinationType.TOPIC && !messageQueue.isDurable())
- {
- // This is a consumer of a non durable topic subscription. We don't need to store
- // deliveries since if the consumer is closed or dies the refs go too.
- this.retainDeliveries = false;
- }
- else
- {
- this.retainDeliveries = true;
- }
-
this.started = this.sessionEndpoint.getConnectionEndpoint().isStarted();
// adding the consumer to the queue
@@ -415,11 +398,6 @@
return this.id;
}
- boolean isRetainDeliveries()
- {
- return this.retainDeliveries;
- }
-
void setLastDeliveryID(long id)
{
this.lastDeliveryID = id;
@@ -431,16 +409,6 @@
this.started = started;
}
- void setDead()
- {
- dead = true;
- }
-
- boolean isDead()
- {
- return dead;
- }
-
Queue getDLQ()
{
return dlq;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -112,7 +112,6 @@
* Session implementation
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
* Parts derived from JBM 1.x ServerSessionEndpoint by
*
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -133,8 +132,6 @@
static final String TEMP_QUEUE_MESSAGECOUNTER_PREFIX = "TempQueue.";
- private static final long DELIVERY_WAIT_TIMEOUT = 5 * 1000;
-
private static final long CLOSE_WAIT_TIMEOUT = 5 * 1000;
// Static ---------------------------------------------------------------------------------------
@@ -153,18 +150,15 @@
private MessagingServer sp;
- private Map consumers;
- private Map browsers;
+ private Map consumers = new HashMap();
+ private Map browsers = new HashMap();
private PostOffice postOffice;
- private int nodeId;
private int defaultMaxDeliveryAttempts;
private long defaultRedeliveryDelay;
private Queue defaultDLQ;
private Queue defaultExpiryQueue;
- private Object deliveryLock = new Object();
-
// Map <deliveryID, Delivery>
private Map deliveries;
@@ -173,15 +167,18 @@
//Temporary until we have our own NIO transport
QueuedExecutor executor = new QueuedExecutor(new LinkedQueue());
- private LinkedQueue toDeliver = new LinkedQueue();
-
- private boolean waitingToClose = false;
-
private Object waitLock = new Object();
+
+ private Transaction tx;
+
+ private boolean transacted;
+
+ private boolean xa;
// Constructors ---------------------------------------------------------------------------------
- ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint) throws Exception
+ ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
+ boolean transacted, boolean xa) throws Exception
{
this.id = sessionID;
@@ -191,12 +188,6 @@
postOffice = sp.getPostOffice();
- nodeId = sp.getConfiguration().getMessagingServerID();
-
- consumers = new HashMap();
-
- browsers = new HashMap();
-
defaultDLQ = sp.getDefaultDLQInstance();
defaultExpiryQueue = sp.getDefaultExpiryQueueInstance();
@@ -208,6 +199,15 @@
deliveries = new ConcurrentHashMap();
deliveryIdSequence = new SynchronizedLong(0);
+
+ this.transacted = transacted;
+
+ this.xa = xa;
+
+ if (transacted && !xa)
+ {
+ tx = new TransactionImpl();
+ }
}
// SessionDelegate implementation ---------------------------------------------------------------
@@ -853,35 +853,14 @@
if (trace) { log.trace("Delivery id is now " + deliveryId); }
- //TODO can't we combine flags isRetainDeliveries and isReplicating - surely they're mutually exclusive?
- if (consumer.isRetainDeliveries())
- {
- // Add a delivery
+ // Add a delivery
- rec = new DeliveryRecord(ref, consumer, deliveryId);
+ rec = new DeliveryRecord(ref, consumer, deliveryId);
- deliveries.put(new Long(deliveryId), rec);
+ deliveries.put(new Long(deliveryId), rec);
- if (trace) { log.trace(this + " added delivery " + deliveryId + ": " + ref); }
- }
- else
- {
- //Acknowledge it now
- try
- {
- //This basically just releases the memory reference
+ if (trace) { log.trace(this + " added delivery " + deliveryId + ": " + ref); }
- if (trace) { log.trace("Acknowledging delivery now"); }
-
- ref.acknowledge(connectionEndpoint.getMessagingServer().getPersistenceManager());
- }
- catch (Throwable t)
- {
- log.error("Failed to acknowledge delivery", t);
- }
- }
-
-
performDelivery(ref, deliveryId, consumer);
}
@@ -894,12 +873,6 @@
return;
}
- if (consumer.isDead())
- {
- //Ignore any responses that come back after consumer has died
- return;
- }
-
if (trace) { log.trace(this + " performing delivery for " + ref); }
// We send the message to the client on the current thread. The message is written onto the
@@ -946,17 +919,6 @@
{
long id = ack.getDeliveryID();
- //TODO - do this more elegantly
- if (ack instanceof DeliveryInfo)
- {
- if (!((DeliveryInfo)ack).isShouldAck())
- {
- //If we are in VM then acks for non durable subs will still exist - this
- //won't happen remoptely since they are not written to the wire
- continue;
- }
- }
-
DeliveryRecord rec = (DeliveryRecord)deliveries.get(id);
DeliveryCallback cb = new DeliveryCallback(id);
@@ -1006,11 +968,7 @@
try
{
- //Prompting delivery must be asynchronous to avoid deadlock
- //but we cannot use one way invocations on cancelDelivery and
- //cancelDeliveries because remoting one way invocations can
- //overtake each other in flight - this problem will
- //go away when we have our own transport and our dedicated connection
+ //TODO - do we really need to prompt on a different thread?
this.executor.execute(new Runnable() { public void run() { queue.deliver();} } );
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -25,6 +25,7 @@
import java.util.List;
import javax.jms.JMSException;
+import javax.transaction.xa.Xid;
import org.jboss.jms.client.Closeable;
import org.jboss.jms.client.impl.Ack;
@@ -120,5 +121,27 @@
int getDupsOKBatchSize();
public boolean isStrictTck();
+
+// public void XAStart(Xid xid) throws JMSException;
+//
+// public void XAEnd(Xid xid) throws JMSException;
+//
+// public void XASuspend(Xid xid) throws JMSException;
+//
+// public void XAJoin(Xid xid) throws JMSException;
+//
+// public void XAResume(Xid xid) throws JMSException;
+//
+// public void XAPrepare(Xid xid) throws JMSException;
+//
+// public void XACommit(Xid xid, boolean onePhase) throws JMSException;
+//
+// public void XARollback(Xid xid) throws JMSException;
+//
+// public List<Xid> XARecover() throws JMSException;
+//
+// public void XASetTxTimeout(int seconds) throws JMSException;
+//
+// public int XAGetTimeout() throws JMSException;
}
Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -67,13 +67,6 @@
private boolean clientSide;
- private boolean hasPersistentAcks;
-
- private boolean failedOver;
-
- private boolean removeAcks;
-
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -109,29 +102,9 @@
}
SessionTxState sessionTxState = getSessionTxState(sessionId);
- sessionTxState.addAck(info);
-
- if (info.getMessage().getCoreMessage().isDurable())
- {
- hasPersistentAcks = true;
- }
-
- if (!info.isShouldAck())
- {
- removeAcks = true;
- }
+ sessionTxState.addAck(info);
}
- public boolean hasPersistentAcks()
- {
- return hasPersistentAcks;
- }
-
- public boolean isFailedOver()
- {
- return failedOver;
- }
-
public void clearMessages()
{
if (!clientSide)
@@ -178,47 +151,8 @@
}
}
- /*
- * Substitute newSessionID for oldSessionID
- */
- public void handleFailover(int newServerID, String oldSessionID, String newSessionID)
- {
- if (!clientSide)
- {
- throw new IllegalStateException("Cannot call this method on the server side");
- }
-
- // Note we have to do this in one go since there may be overlap between old and new session
- // IDs and we don't want to overwrite keys in the map.
+
- Map<String, SessionTxState> tmpMap = null;
-
- if (sessionStatesMap != null)
- {
- for (SessionTxState state: sessionStatesMap.values())
- {
- boolean handled = state.handleFailover(newServerID, oldSessionID, newSessionID);
-
- if (handled)
- {
- if (tmpMap == null)
- {
- tmpMap = new LinkedHashMap<String, SessionTxState>();
- }
- tmpMap.put(newSessionID, state);
- }
- }
- }
-
- if (tmpMap != null)
- {
- // swap
- sessionStatesMap = tmpMap;
- }
-
- failedOver = true;
- }
-
/**
* May return an empty list, but never null.
*/
@@ -293,12 +227,8 @@
{
DeliveryInfo ack = (DeliveryInfo)iter2.next();
- //We don't want to send acks for things like non durable subs which will have been already acked
- if (ack.isShouldAck())
- {
- //We only need the delivery id written
- out.writeLong(ack.getMessage().getDeliveryId());
- }
+ //We only need the delivery id written
+ out.writeLong(ack.getMessage().getDeliveryId());
}
//Marker for end of acks
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2008-01-23 05:28:28 UTC (rev 3616)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2008-01-23 09:40:25 UTC (rev 3617)
@@ -136,20 +136,8 @@
tx.addMessage(sessionId, m);
}
+
/*
- * Failover session from old session ID -> new session ID
- */
- public void handleFailover(int newServerID, String oldSessionID, String newSessionID)
- {
- for (Iterator i = transactions.values().iterator(); i.hasNext(); )
- {
- ClientTransaction tx = (ClientTransaction)i.next();
-
- tx.handleFailover(newServerID, oldSessionID, newSessionID);
- }
- }
-
- /*
* Get all the deliveries corresponding to the session ID
*/
public List getDeliveriesForSession(String sessionID)
@@ -167,8 +155,7 @@
return ackInfos;
}
-
-
+
/**
* Add an acknowledgement to the transaction
*
More information about the jboss-cvs-commits
mailing list