[jboss-cvs] JBoss Messaging SVN: r2892 - in trunk/src/main/org/jboss: jms/tx and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jul 13 12:55:14 EDT 2007
Author: timfox
Date: 2007-07-13 12:55:14 -0400 (Fri, 13 Jul 2007)
New Revision: 2892
Modified:
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
trunk/src/main/org/jboss/jms/tx/ResourceManager.java
trunk/src/main/org/jboss/messaging/core/jmx/JDBCServiceSupport.java
Log:
Fixed issue with 2pc rollback
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-13 13:34:30 UTC (rev 2891)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-13 16:55:14 UTC (rev 2892)
@@ -44,6 +44,7 @@
import org.jboss.jms.delegate.BrowserDelegate;
import org.jboss.jms.delegate.Cancel;
import org.jboss.jms.delegate.ConsumerDelegate;
+import org.jboss.jms.delegate.DeliveryInfo;
import org.jboss.jms.delegate.DeliveryRecovery;
import org.jboss.jms.delegate.SessionEndpoint;
import org.jboss.jms.destination.JBossDestination;
@@ -1528,7 +1529,20 @@
for(Iterator i = acks.iterator(); i.hasNext(); )
{
Ack ack = (Ack)i.next();
+
Long id = new Long(ack.getDeliveryID());
+
+ //TODO - do this more elegantly
+ if (ack instanceof DeliveryInfo)
+ {
+ if (!((DeliveryInfo)ack).isShouldAck())
+ {
+ //If we are in VM then acks for non durable subs will still exist - this
+ //won't happen remoptely since they are not written to the wire
+ continue;
+ }
+ }
+
DeliveryRecord rec = (DeliveryRecord)deliveries.get(id);
if (rec == null)
Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2007-07-13 13:34:30 UTC (rev 2891)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2007-07-13 16:55:14 UTC (rev 2892)
@@ -250,45 +250,6 @@
// Streamable implementation ---------------------------------
- //For message consumed using a non durable subscriber - we don't need to ack to the server
- //so we remove them here
- //TODO this could be optimised to prevent this extra removal stage before sending
- public void removeUnnecessaryAcks()
- {
- if (removeAcks)
- {
- Iterator iter = sessionStatesMap.values().iterator();
-
- while (iter.hasNext())
- {
- SessionTxState state = (SessionTxState)iter.next();
-
- List acks = state.getAcks();
-
- Iterator iter2 = acks.iterator();
-
- List newAcks = new ArrayList();
-
- while (iter2.hasNext())
- {
- DeliveryInfo ack = (DeliveryInfo)iter2.next();
-
- if (ack.isShouldAck())
- {
- if (newAcks == null)
- {
- newAcks = new ArrayList();
- }
- newAcks.add(ack);
- }
- }
-
- state.setAcks(newAcks);
- }
- }
- }
-
-
public void write(DataOutputStream out) throws Exception
{
out.writeByte(state);
@@ -326,17 +287,22 @@
List acks = state.getAcks();
- out.writeInt(acks.size());
-
iter2 = acks.iterator();
while (iter2.hasNext())
{
DeliveryInfo ack = (DeliveryInfo)iter2.next();
-
- //We only need the delivery id written
- out.writeLong(ack.getMessageProxy().getDeliveryId());
+
+ //We don't want to send acks for things like non durable subs which will have been already acked
+ if (ack.isShouldAck())
+ {
+ //We only need the delivery id written
+ out.writeLong(ack.getMessageProxy().getDeliveryId());
+ }
}
+
+ //Marker for end of acks
+ out.writeLong(Long.MIN_VALUE);
}
}
}
@@ -375,13 +341,11 @@
sessionState.addMessage(msg);
}
- int numAcks = in.readInt();
-
- for (int j = 0; j < numAcks; j++)
+ long l;
+
+ while ((l = in.readLong()) != Long.MIN_VALUE)
{
- long ack = in.readLong();
-
- sessionState.addAck(new DefaultAck(ack));
+ sessionState.addAck(new DefaultAck(l));
}
}
}
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-07-13 13:34:30 UTC (rev 2891)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-07-13 16:55:14 UTC (rev 2892)
@@ -205,8 +205,6 @@
try
{
- request.state.removeUnnecessaryAcks();
-
connection.sendTransaction(request, false);
// If we get this far we can remove the transaction
@@ -617,6 +615,10 @@
del.redeliver(acks);
}
+ else
+ {
+ log.info("There are no messages to redeliver");
+ }
}
}
@@ -630,11 +632,6 @@
{
try
{
- if (request.state != null)
- {
- request.state.removeUnnecessaryAcks();
- }
-
connection.sendTransaction(request, false);
}
catch (Throwable t)
Modified: trunk/src/main/org/jboss/messaging/core/jmx/JDBCServiceSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/jmx/JDBCServiceSupport.java 2007-07-13 13:34:30 UTC (rev 2891)
+++ trunk/src/main/org/jboss/messaging/core/jmx/JDBCServiceSupport.java 2007-07-13 16:55:14 UTC (rev 2892)
@@ -67,17 +67,13 @@
protected void startService() throws Exception
{
+ InitialContext ic = null;
try
{
+ ic = new InitialContext();
+ ds = (DataSource)ic.lookup(dataSourceJNDIName);
if (ds == null)
{
- InitialContext ic = new InitialContext();
- ds = (DataSource)ic.lookup(dataSourceJNDIName);
- ic.close();
- }
-
- if (ds == null)
- {
throw new IllegalStateException("No DataSource found. This service dependencies must " +
"have not been enforced correctly!");
}
@@ -87,6 +83,13 @@
{
throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
}
+ finally
+ {
+ if (ic != null)
+ {
+ ic.close();
+ }
+ }
}
protected void stopService() throws Exception
More information about the jboss-cvs-commits
mailing list