[jboss-cvs] JBoss Messaging SVN: r5902 - in trunk/src/main/org/jboss/messaging/ra: inflow and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Feb 19 10:30:08 EST 2009
Author: jesper.pedersen
Date: 2009-02-19 10:30:08 -0500 (Thu, 19 Feb 2009)
New Revision: 5902
Modified:
trunk/src/main/org/jboss/messaging/ra/JBMManagedConnection.java
trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivationSpec.java
trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java
Log:
[JBMESSAGING-1367] Create JCA resource adapter for JBM 2.0
Modified: trunk/src/main/org/jboss/messaging/ra/JBMManagedConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/JBMManagedConnection.java 2009-02-19 15:04:28 UTC (rev 5901)
+++ trunk/src/main/org/jboss/messaging/ra/JBMManagedConnection.java 2009-02-19 15:30:08 UTC (rev 5902)
@@ -188,7 +188,7 @@
if (isDestroyed.get())
throw new IllegalStateException("The managed connection is already destroyed");
- Object session = null; // TODO - get one from JBossConnection -- new JBMSession(this, (JBMConnectionRequestInfo)cxRequestInfo);
+ Object session = new JBMSession(this, (JBMConnectionRequestInfo)cxRequestInfo);
handles.add(session);
return session;
}
Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivationSpec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivationSpec.java 2009-02-19 15:04:28 UTC (rev 5901)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMActivationSpec.java 2009-02-19 15:30:08 UTC (rev 5902)
@@ -137,7 +137,7 @@
password = null;
maxMessages = Integer.valueOf(1);
minSession = Integer.valueOf(1);
- maxSession = Integer.valueOf(1);
+ maxSession = Integer.valueOf(15);
keepAlive = Long.valueOf(60000);
sessionTransacted = Boolean.TRUE;
reconnectAttempts = Integer.valueOf(5);
Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java 2009-02-19 15:04:28 UTC (rev 5901)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java 2009-02-19 15:30:08 UTC (rev 5902)
@@ -21,6 +21,8 @@
*/
package org.jboss.messaging.ra.inflow;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -32,6 +34,10 @@
import javax.jms.XASession;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkEvent;
+import javax.resource.spi.work.WorkException;
+import javax.resource.spi.work.WorkListener;
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -46,7 +52,7 @@
* @author <a href="mailto:jesper.pedersen at jboss.org">Jesper Pedersen</a>
* @version $Revision: $
*/
-public class JBMMessageHandler implements MessageListener
+public class JBMMessageHandler implements MessageListener, Work, WorkListener
{
/** The logger */
private static final Logger log = Logger.getLogger(JBMMessageHandler.class);
@@ -57,6 +63,12 @@
/** The message handler pool */
private JBMMessageHandlerPool pool;
+ /** Is in use */
+ private AtomicBoolean inUse;
+
+ /** Done latch */
+ private CountDownLatch done;
+
/** The transacted flag */
private boolean transacted;
@@ -97,6 +109,9 @@
{
if (trace)
log.trace("setup()");
+
+ inUse = new AtomicBoolean(false);
+ done = new CountDownLatch(1);
JBMActivation activation = pool.getActivation();
JBMActivationSpec spec = activation.getActivationSpec();
@@ -153,9 +168,6 @@
endpoint = endpointFactory.createEndpoint(xaResource);
- // Create the transaction demarcation strategy
- txnStrategy = createTransactionDemarcation();
-
// Set the message listener
messageConsumer.setMessageListener(this);
}
@@ -200,6 +212,18 @@
}
/**
+ * Is in use
+ * @return True if in use; otherwise false
+ */
+ public boolean isInUse()
+ {
+ if (trace)
+ log.trace("isInUse()");
+
+ return inUse.get();
+ }
+
+ /**
* On message
* @param message The message
*/
@@ -207,6 +231,8 @@
{
if (trace)
log.trace("onMessage(" + message + ")");
+
+ inUse.set(true);
try
{
@@ -229,6 +255,51 @@
if (txnStrategy != null)
txnStrategy.error();
}
+
+ inUse.set(false);
+ done.countDown();
+ }
+
+ /**
+ * Run
+ */
+ public void run()
+ {
+ if (trace)
+ log.trace("run()");
+
+ try
+ {
+ setup();
+
+ txnStrategy = createTransactionDemarcation();
+ }
+ catch (Throwable t)
+ {
+ log.error("Error creating transaction demarcation. Cannot continue.");
+ return;
+ }
+
+ try
+ {
+ // Wait for onMessage
+ while (done.getCount() > 0)
+ {
+ try
+ {
+ done.wait();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ if (txnStrategy != null)
+ txnStrategy.error();
+
+ }
finally
{
if (txnStrategy != null)
@@ -236,11 +307,64 @@
txnStrategy = null;
}
+ }
+ /**
+ * Release
+ */
+ public void release()
+ {
+ if (trace)
+ log.trace("release()");
+ }
+
+ /**
+ * Work accepted
+ * @param e The work event
+ */
+ public void workAccepted(WorkEvent e)
+ {
+ if (trace)
+ log.trace("workAccepted()");
+ }
+
+ /**
+ * Work completed
+ * @param e The work event
+ */
+ public void workCompleted(WorkEvent e)
+ {
+ if (trace)
+ log.trace("workCompleted()");
+
+ teardown();
pool.removeHandler(this);
}
/**
+ * Work rejected
+ * @param e The work event
+ */
+ public void workRejected(WorkEvent e)
+ {
+ if (trace)
+ log.trace("workRejected()");
+
+ teardown();
+ pool.removeHandler(this);
+ }
+
+ /**
+ * Work started
+ * @param e The work event
+ */
+ public void workStarted(WorkEvent e)
+ {
+ if (trace)
+ log.trace("workStarted()");
+ }
+
+ /**
* Create the transaction demarcation strategy
* @return The strategy
*/
Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java 2009-02-19 15:04:28 UTC (rev 5901)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandlerPool.java 2009-02-19 15:30:08 UTC (rev 5902)
@@ -25,6 +25,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.resource.spi.work.WorkManager;
+
import org.jboss.messaging.core.logging.Logger;
/**
@@ -46,7 +48,7 @@
private JBMActivation activation;
/** The active sessions */
- private List<JBMMessageHandler> activeSessions;
+ private ArrayList<JBMMessageHandler> activeSessions;
/** Whether the pool is stopped */
private AtomicBoolean stopped;
@@ -114,7 +116,6 @@
synchronized (activeSessions)
{
- handler.teardown();
activeSessions.remove(handler);
if (!stopped.get())
@@ -128,8 +129,8 @@
log.error("Unable to restart handler", e);
}
}
+ activeSessions.notifyAll();
}
- activeSessions.notifyAll();
}
/**
@@ -162,9 +163,11 @@
if (trace)
log.trace("setupSession()");
+ WorkManager workManager = activation.getWorkManager();
+
// Create the session
JBMMessageHandler handler = new JBMMessageHandler(this);
- handler.setup();
+ workManager.scheduleWork(handler, 0, null, handler);
activeSessions.add(handler);
}
@@ -179,6 +182,22 @@
synchronized (activeSessions)
{
+ List<JBMMessageHandler> cloned = (List<JBMMessageHandler>)activeSessions.clone();
+ for (int i = 0; i < cloned.size(); ++i)
+ {
+ JBMMessageHandler handler = cloned.get(i);
+ if (!handler.isInUse())
+ {
+ handler.teardown();
+ activeSessions.remove(handler);
+ }
+ }
+
+ activeSessions.notifyAll();
+ }
+
+ synchronized (activeSessions)
+ {
if (activation.getActivationSpec().isForceClearOnShutdown())
{
int attempts = 0;
More information about the jboss-cvs-commits
mailing list