[jboss-cvs] JBossAS SVN: r100711 - in trunk/connector: src/main/java/org/jboss/resource/work and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Feb 8 13:05:54 EST 2010


Author: jesper.pedersen
Date: 2010-02-08 13:05:53 -0500 (Mon, 08 Feb 2010)
New Revision: 100711

Modified:
   trunk/connector/pom.xml
   trunk/connector/src/main/java/org/jboss/resource/work/JBossWorkManager.java
   trunk/connector/src/main/java/org/jboss/resource/work/WorkWrapper.java
Log:
[JBAS-7711] Backport JBoss Thread based WorkManager

Modified: trunk/connector/pom.xml
===================================================================
--- trunk/connector/pom.xml	2010-02-08 17:15:46 UTC (rev 100710)
+++ trunk/connector/pom.xml	2010-02-08 18:05:53 UTC (rev 100711)
@@ -239,6 +239,10 @@
       <artifactId>jboss-security-spi</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.jboss.threads</groupId>
+      <artifactId>jboss-threads</artifactId>
+    </dependency>
+    <dependency>
       <groupId>quartz</groupId>
       <artifactId>quartz</artifactId>
     </dependency>

Modified: trunk/connector/src/main/java/org/jboss/resource/work/JBossWorkManager.java
===================================================================
--- trunk/connector/src/main/java/org/jboss/resource/work/JBossWorkManager.java	2010-02-08 17:15:46 UTC (rev 100710)
+++ trunk/connector/src/main/java/org/jboss/resource/work/JBossWorkManager.java	2010-02-08 18:05:53 UTC (rev 100711)
@@ -1,6 +1,6 @@
 /*
  * JBoss, Home of Professional Open Source.
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
  * as indicated by the @author tags. See the copyright.txt file in the
  * distribution for a full listing of individual contributors.
  *
@@ -21,18 +21,25 @@
  */
 package org.jboss.resource.work;
 
-import java.util.concurrent.Executor;
-import org.jboss.system.ServiceMBeanSupport;
-import org.jboss.tm.JBossXATerminator;
-import org.jboss.util.threadpool.Task;
-
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import javax.resource.spi.work.ExecutionContext;
 import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkCompletedException;
+import javax.resource.spi.work.WorkEvent;
 import javax.resource.spi.work.WorkException;
 import javax.resource.spi.work.WorkListener;
 import javax.resource.spi.work.WorkManager;
+import javax.resource.spi.work.WorkRejectedException;
 import javax.transaction.xa.Xid;
 
+import org.jboss.logging.Logger;
+import org.jboss.threads.BlockingExecutor;
+import org.jboss.threads.ExecutionTimedOutException;
+import org.jboss.tm.JBossXATerminator;
+import org.jboss.system.ServiceMBeanSupport;
+
 /**
  * The work manager implementation
  *
@@ -41,39 +48,46 @@
  */
 public class JBossWorkManager extends ServiceMBeanSupport implements WorkManager, JBossWorkManagerMBean
 {
+   /** The logger */
+   private static Logger log = Logger.getLogger(JBossWorkManager.class);
+   
    /** Whether trace is enabled */
-   private boolean trace = log.isTraceEnabled();
-   
-   /** The thread pool */
-   private Executor executor;
+   private static boolean trace = log.isTraceEnabled();
 
-   /** The xa terminator */
+   /** The executor */
+   private BlockingExecutor executor;
+
+   /** The XA terminator */
    private JBossXATerminator xaTerminator;
 
-    /**
-    * Retrieve the thread pool
-    *
-    * @return the thread pool
+   /**
+    * Constructor
     */
-   public Executor getExecutor()
+   public JBossWorkManager()
    {
+   }
+   
+   /**
+    * Retrieve the executor
+    * @return The executor
+    */
+   public BlockingExecutor getExecutor()
+   {
       return executor;
    }
 
    /**
-    * Set the thread pool
-    *
-    * @param executor the thread pool
+    * Set the executor
+    * @param executor The executor
     */
-   public void setExecutor(Executor executor)
+   public void setExecutor(BlockingExecutor executor)
    {
       this.executor = executor;
    }
 
    /**
     * Get the XATerminator
-    * 
-    * @return the xa terminator
+    * @return The XA terminator
     */
    public JBossXATerminator getXATerminator()
    {
@@ -82,179 +96,311 @@
 
    /**
     * Set the XATerminator
-    * 
-    * @param xaTerminator the xa terminator
+    * @param xaTerminator The XA terminator
     */
    public void setXATerminator(JBossXATerminator xaTerminator)
    {
       this.xaTerminator = xaTerminator;
    }
 
+   /**
+    * Get the work manager instance
+    * @return The instance
+    */
    public WorkManager getInstance()
    {
       return this;
    }
 
-   public void doWork(Work work, long startTimeout, ExecutionContext ctx, WorkListener listener) throws WorkException
-   {
-      if (ctx == null)
-         ctx = new ExecutionContext();
-      WorkWrapper wrapper = new WorkWrapper(this, work, Task.WAIT_FOR_COMPLETE, startTimeout, ctx, listener);
-      importWork(wrapper);
-      executeWork(wrapper);
-      if (wrapper.getWorkException() != null)
-         throw wrapper.getWorkException();
-   }
-
+   /**
+    * {@inheritDoc}
+    */
    public void doWork(Work work) throws WorkException
    {
       doWork(work, WorkManager.INDEFINITE, null, null);
    }
-
-   public long startWork(Work work, long startTimeout, ExecutionContext ctx, WorkListener listener) throws WorkException
+   
+   /**
+    * {@inheritDoc}
+    */
+   public void doWork(Work work,
+                      long startTimeout, 
+                      ExecutionContext execContext, 
+                      WorkListener workListener) 
+      throws WorkException
    {
-      if (ctx == null)
-         ctx = new ExecutionContext();
-      WorkWrapper wrapper = new WorkWrapper(this, work, Task.WAIT_FOR_START, startTimeout, ctx, listener);
-      importWork(wrapper);
-      executeWork(wrapper);
-      if (wrapper.getWorkException() != null)
-         throw wrapper.getWorkException();
-      return wrapper.getBlockedElapsed();
-   }
+      WorkException exception = null;
+      WorkWrapper wrapper = null;
+      try
+      {
+         if (work == null)
+            throw new WorkRejectedException("Work is null");
 
-   public long startWork(Work work) throws WorkException
-   {
-      return startWork(work, WorkManager.INDEFINITE, null, null);
-   }
+         if (startTimeout < 0)
+            throw new WorkRejectedException("StartTimeout is negative: " + startTimeout);
 
-   public void scheduleWork(Work work, long startTimeout, ExecutionContext ctx, WorkListener listener) throws WorkException
-   {
-      if (ctx == null)
-         ctx = new ExecutionContext();
-      WorkWrapper wrapper = new WorkWrapper(this, work, Task.WAIT_NONE, startTimeout, ctx, listener);
-      importWork(wrapper);
-      executeWork(wrapper);
-      if (wrapper.getWorkException() != null)
-         throw wrapper.getWorkException();
-   }
+         if (execContext == null)
+         {
+            execContext = new ExecutionContext();  
+         }
 
-   public void scheduleWork(Work work) throws WorkException
-   {
-      scheduleWork(work, WorkManager.INDEFINITE, null, null);
-   }
+         final CountDownLatch completedLatch = new CountDownLatch(1);
 
-   /**
-    * Import any work
-    * 
-    * @param wrapper the work wrapper
-    * @throws WorkException for any error 
-    */
-   protected void importWork(WorkWrapper wrapper) throws WorkException
-   {
-      trace = log.isTraceEnabled();
-      if (trace)
-         log.trace("Importing work " + wrapper);
-      
-      ExecutionContext ctx = wrapper.getExecutionContext();
-      if (ctx != null)
+         wrapper = new WorkWrapper(this, work, execContext, workListener, null, completedLatch);
+
+         if (workListener != null)
+         {
+            WorkEvent event = new WorkEvent(this, WorkEvent.WORK_ACCEPTED, work, null);
+            workListener.workAccepted(event);
+         }
+
+         BlockingExecutor executor = getExecutor(work);
+
+         if (startTimeout == WorkManager.INDEFINITE)
+         {
+            executor.executeBlocking(wrapper);
+         }
+         else
+         {
+            executor.executeBlocking(wrapper, startTimeout, TimeUnit.MILLISECONDS);
+         }
+
+         completedLatch.await();
+      }
+      catch (ExecutionTimedOutException etoe)
       {
-         Xid xid = ctx.getXid();
-         if (xid != null)
+         exception = new WorkRejectedException(etoe);
+         exception.setErrorCode(WorkRejectedException.START_TIMED_OUT);  
+      }
+      catch (RejectedExecutionException ree)
+      {
+         exception = new WorkRejectedException(ree);
+      }
+      catch (WorkException we)
+      {
+         exception = we;
+      }
+      catch (InterruptedException ie)
+      {
+         log.error(ie.getMessage(), ie);
+      }
+      finally
+      {
+         if (exception != null)
          {
-            //JBAS-4002 base value is in seconds as per the API, here we convert to millis
-            long timeout = (ctx.getTransactionTimeout() * 1000);
-            xaTerminator.registerWork(wrapper.getWork(), xid, timeout);
+            if (workListener != null)
+            {
+               WorkEvent event = new WorkEvent(this, WorkEvent.WORK_REJECTED, work, exception);
+               workListener.workRejected(event);
+            }
+
+            throw exception;
          }
+
+         checkWorkCompletionException(wrapper);
       }
-      if (trace)
-         log.trace("Imported work " + wrapper);
    }
    
    /**
-    * Execute the work
-    * 
-    * @param wrapper the work wrapper
-    * @throws WorkException for any error 
+    * {@inheritDoc}
     */
-   protected void executeWork(WorkWrapper wrapper) throws WorkException
+   public long startWork(Work work) throws WorkException
    {
-      if (trace)
-         log.trace("Submitting work to thread pool " + wrapper);
-
-      executor.execute(wrapper);
-
-      if (trace)
-         log.trace("Submitted work to thread pool " + wrapper);
+      return startWork(work, WorkManager.INDEFINITE, null, null);
    }
-
+   
    /**
-    * Start work
-    * 
-    * @param wrapper the work wrapper
-    * @throws WorkException for any error 
+    * {@inheritDoc}
     */
-   protected void startWork(WorkWrapper wrapper) throws WorkException
+   public long startWork(Work work, 
+                         long startTimeout, 
+                         ExecutionContext execContext, 
+                         WorkListener workListener) 
+      throws WorkException
    {
-      if (trace)
-         log.trace("Starting work " + wrapper);
+      WorkException exception = null;
+      WorkWrapper wrapper = null;
+      try
+      {
+         if (work == null)
+            throw new WorkRejectedException("Work is null");
 
-      ExecutionContext ctx = wrapper.getExecutionContext();
-      if (ctx != null)
+         if (startTimeout < 0)
+            throw new WorkRejectedException("StartTimeout is negative: " + startTimeout);
+
+         long started = System.currentTimeMillis();
+
+         if (execContext == null)
+         {
+            execContext = new ExecutionContext();  
+         }
+
+         final CountDownLatch startedLatch = new CountDownLatch(1);
+
+         wrapper = new WorkWrapper(this, work, execContext, workListener, startedLatch, null);
+
+         if (workListener != null)
+         {
+            WorkEvent event = new WorkEvent(this, WorkEvent.WORK_ACCEPTED, work, null);
+            workListener.workAccepted(event);
+         }
+
+         BlockingExecutor executor = getExecutor(work);
+
+         if (startTimeout == WorkManager.INDEFINITE)
+         {
+            executor.executeBlocking(wrapper);
+         }
+         else
+         {
+            executor.executeBlocking(wrapper, startTimeout, TimeUnit.MILLISECONDS);
+         }
+
+         startedLatch.await();
+
+         return System.currentTimeMillis() - started;
+      }
+      catch (ExecutionTimedOutException etoe)
       {
-         Xid xid = ctx.getXid();
-         if (xid != null)
+         exception = new WorkRejectedException(etoe);
+         exception.setErrorCode(WorkRejectedException.START_TIMED_OUT);  
+      }
+      catch (RejectedExecutionException ree)
+      {
+         exception = new WorkRejectedException(ree);
+      }
+      catch (WorkException we)
+      {
+         exception = we;
+      }
+      catch (InterruptedException ie)
+      {
+         log.error(ie.getMessage(), ie);
+      }
+      finally
+      {
+         if (exception != null)
          {
-            xaTerminator.startWork(wrapper.getWork(), xid);
+            if (workListener != null)
+            {
+               WorkEvent event = new WorkEvent(this, WorkEvent.WORK_REJECTED, work, exception);
+               workListener.workRejected(event);
+            }
+
+            throw exception;
          }
+
+         checkWorkCompletionException(wrapper);
       }
-      if (trace)
-         log.trace("Started work " + wrapper);
+
+      return WorkManager.UNKNOWN;
    }
-
+   
    /**
-    * End work
-    * 
-    * @param wrapper the work wrapper
+    * {@inheritDoc}
     */
-   protected void endWork(WorkWrapper wrapper)
+   public void scheduleWork(Work work) throws WorkException
    {
-      if (trace)
-         log.trace("Ending work " + wrapper);
+      scheduleWork(work, WorkManager.INDEFINITE, null, null);
+   }
+   
+   /**
+    * {@inheritDoc}
+    */
+   public void scheduleWork(Work work,
+                            long startTimeout, 
+                            ExecutionContext execContext, 
+                            WorkListener workListener) 
+      throws WorkException
+   {
+      WorkException exception = null;
+      WorkWrapper wrapper = null;
+      try
+      {
+         if (work == null)
+            throw new WorkRejectedException("Work is null");
 
-      ExecutionContext ctx = wrapper.getExecutionContext();
-      if (ctx != null)
+         if (startTimeout < 0)
+            throw new WorkRejectedException("StartTimeout is negative: " + startTimeout);
+
+         if (execContext == null)
+         {
+            execContext = new ExecutionContext();  
+         }
+
+         wrapper = new WorkWrapper(this, work, execContext, workListener, null, null);
+
+         if (workListener != null)
+         {
+            WorkEvent event = new WorkEvent(this, WorkEvent.WORK_ACCEPTED, work, null);
+            workListener.workAccepted(event);
+         }
+
+         BlockingExecutor executor = getExecutor(work);
+
+         if (startTimeout == WorkManager.INDEFINITE)
+         {
+            executor.executeBlocking(wrapper);
+         }
+         else
+         {
+            executor.executeBlocking(wrapper, startTimeout, TimeUnit.MILLISECONDS);
+         }
+      }
+      catch (ExecutionTimedOutException etoe)
       {
-         Xid xid = ctx.getXid();
-         if (xid != null)
+         exception = new WorkRejectedException(etoe);
+         exception.setErrorCode(WorkRejectedException.START_TIMED_OUT);  
+      }
+      catch (RejectedExecutionException ree)
+      {
+         exception = new WorkRejectedException(ree);
+      }
+      catch (WorkException we)
+      {
+         exception = we;
+      }
+      catch (InterruptedException ie)
+      {
+         log.error(ie.getMessage(), ie);
+      }
+      finally
+      {
+         if (exception != null)
          {
-            xaTerminator.endWork(wrapper.getWork(), xid);
+            if (workListener != null)
+            {
+               WorkEvent event = new WorkEvent(this, WorkEvent.WORK_REJECTED, work, exception);
+               workListener.workRejected(event);
+            }
+
+            throw exception;
          }
+
+         checkWorkCompletionException(wrapper);
       }
-      if (trace)
-         log.trace("Ended work " + wrapper);
    }
 
    /**
-    * Cancel work
-    * 
-    * @param wrapper the work wrapper
+    * Get the executor
+    * @param work The work instance
+    * @return The executor
     */
-   protected void cancelWork(WorkWrapper wrapper)
+   private BlockingExecutor getExecutor(Work work)
    {
-      if (trace)
-         log.trace("Cancel work " + wrapper);
-
-      ExecutionContext ctx = wrapper.getExecutionContext();
-      if (ctx != null)
+      return executor;
+   }
+   
+   /**
+    * Checks work completed status. 
+    * @param wrapper work wrapper instance
+    * @throws {@link WorkException} if work is completed with an exception
+    */
+   private void checkWorkCompletionException(WorkWrapper wrapper) throws WorkException
+   {
+      if (wrapper.getWorkException() != null)
       {
-         Xid xid = ctx.getXid();
-         if (xid != null)
-         {
-            xaTerminator.cancelWork(wrapper.getWork(), xid);
-         }
-      }
-      if (trace)
-         log.trace("Canceled work " + wrapper);
+         throw wrapper.getWorkException();  
+      }      
    }
 }

Modified: trunk/connector/src/main/java/org/jboss/resource/work/WorkWrapper.java
===================================================================
--- trunk/connector/src/main/java/org/jboss/resource/work/WorkWrapper.java	2010-02-08 17:15:46 UTC (rev 100710)
+++ trunk/connector/src/main/java/org/jboss/resource/work/WorkWrapper.java	2010-02-08 18:05:53 UTC (rev 100711)
@@ -1,6 +1,6 @@
 /*
  * JBoss, Home of Professional Open Source.
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
  * as indicated by the @author tags. See the copyright.txt file in the
  * distribution for a full listing of individual contributors.
  *
@@ -21,60 +21,57 @@
  */
 package org.jboss.resource.work;
 
+import java.util.concurrent.CountDownLatch;
+
 import javax.resource.spi.work.ExecutionContext;
 import javax.resource.spi.work.Work;
 import javax.resource.spi.work.WorkCompletedException;
 import javax.resource.spi.work.WorkEvent;
 import javax.resource.spi.work.WorkException;
 import javax.resource.spi.work.WorkListener;
-import javax.resource.spi.work.WorkManager;
 import javax.resource.spi.work.WorkRejectedException;
+import javax.transaction.xa.Xid;
 
 import org.jboss.logging.Logger;
-import org.jboss.util.JBossStringBuilder;
-import org.jboss.util.NestedRuntimeException;
-import org.jboss.util.threadpool.BasicTaskWrapper;
-import org.jboss.util.threadpool.StartTimeoutException;
-import org.jboss.util.threadpool.Task;
 
 /**
  * Wraps the resource adapter's work.
  *
- * @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:jesper.pedersen at jboss.org">Jesper Pedersen</a>
  * @version $Revision$
  */
-public class WorkWrapper extends BasicTaskWrapper implements Task
+public class WorkWrapper implements Runnable
 {
    /** The log */
-   private static final Logger log = Logger.getLogger(WorkWrapper.class);
+   private static Logger log = Logger.getLogger(WorkWrapper.class);
 
    /** Whether we are tracing */
-   private boolean trace = log.isTraceEnabled();
+   private static boolean trace = log.isTraceEnabled();
    
    /** The work */
    private Work work;
 
    /** The execution context */
    private ExecutionContext executionContext;
-
+   
    /** the work listener */
-   private WorkListener workListener;
+   private WorkListener workListener;   
 
-   /** The start timeout */
-   private long startTimeout;
-
    /** The work manager */
    private JBossWorkManager workManager;
 
-   /** The wait type */
-   private int waitType;
-
    /** The blocked time */
    private long blockedTime;
 
    /** Any exception */
    private WorkException exception;
 
+   /** Started latch */
+   private CountDownLatch startedLatch;
+
+   /** Completed latch */
+   private CountDownLatch completedLatch;
+
    /**
     * Create a new WorkWrapper
     *
@@ -85,7 +82,12 @@
     * @param workListener the WorkListener
     * @throws IllegalArgumentException for null work, execution context or a negative start timeout
     */
-   public WorkWrapper(JBossWorkManager workManager, Work work, int waitType, long startTimeout, ExecutionContext executionContext, WorkListener workListener)
+   public WorkWrapper(JBossWorkManager workManager, 
+                      Work work, 
+                      ExecutionContext executionContext, 
+                      WorkListener workListener,
+                      CountDownLatch startedLatch,
+                      CountDownLatch completedLatch)
    {
       super();
 
@@ -93,19 +95,15 @@
          throw new IllegalArgumentException("Null work");
       if (executionContext == null)
          throw new IllegalArgumentException("Null execution context");
-      if (startTimeout < 0)
-         throw new IllegalArgumentException("Illegal start timeout: " + startTimeout);
 
       this.workManager = workManager;
       this.work = work;
-      this.waitType = waitType;
-      this.startTimeout = startTimeout;
       this.executionContext = executionContext;
       this.workListener = workListener;
-
-      setTask(this);
+      this.startedLatch = startedLatch;
+      this.completedLatch = completedLatch;
    }
-
+   
    /**
     * Get the work manager
     *
@@ -127,16 +125,6 @@
    }
 
    /**
-    * Retrieve the work listener
-    *
-    * @return the WorkListener
-    */
-   public WorkListener getWorkListener()
-   {
-      return workListener;
-   }
-
-   /**
     * Retrieve the exection context
     *
     * @return the execution context
@@ -147,13 +135,13 @@
    }
 
    /**
-    * Retrieve the time blocked
+    * Retrieve the work listener
     *
-    * @return the blocked time
+    * @return the WorkListener
     */
-   public long getBlockedElapsed()
+   public WorkListener getWorkListener()
    {
-      return blockedTime;
+      return workListener;
    }
    
    /**
@@ -166,173 +154,167 @@
       return exception;
    }
 
-   public int getWaitType()
+   /**
+    * Run
+    */
+   public void run()
    {
-      return waitType;
-   }
+      if (trace)
+         log.trace("Starting work " + this);  
 
-   public int getPriority()
-   {
-      return Thread.NORM_PRIORITY;
-   }
+      try
+      {
+         start();
 
-   public long getStartTimeout()
-   {
-      return startTimeout;
-   }
+         if (startedLatch != null)
+            startedLatch.countDown();
 
-   public long getCompletionTimeout()
-   {
-      // [JBAS-6400] Conversion of seconds to milliseconds
-      return executionContext.getTransactionTimeout() * 1000L;
-   }
+         work.run();
 
-   public void execute()
-   {
-      if (trace)
-         log.trace("Executing work " + this);
-      try
-      {
-         workManager.startWork(this);
+         end();
       }
-      catch (WorkException e)
+      catch (Exception e)
       {
-         taskRejected(new NestedRuntimeException(e));
-         return;
-      }
-      try
-      {
-         work.run();
-      }
+         exception = new WorkCompletedException(e);
+
+         cancel();
+      } 
       finally
       {
-         workManager.endWork(this);
-      }
-      if (trace)
-         log.trace("Executed work " + this);
-   }
+         work.release();
 
-   public void stop()
-   {
-      if (trace)
-         log.trace("Stopping work " + this);
+         if (workListener != null)
+         {
+            WorkEvent event = new WorkEvent(workManager, WorkEvent.WORK_COMPLETED, work, exception);
+            workListener.workCompleted(event);
+         }
 
-      work.release();
-   }
+         if (startedLatch != null)
+         {
+            while (startedLatch.getCount() != 0)
+               startedLatch.countDown();
+         }
 
-   public void accepted(long time)
-   {
-      blockedTime = time;
+         if (completedLatch != null)
+            completedLatch.countDown();
 
-      if (trace)
-         log.trace("Accepted work " + this);
-
-      if (workListener != null)
-      {
-         WorkEvent event = new WorkEvent(workManager, WorkEvent.WORK_ACCEPTED, work, null);
-         workListener.workAccepted(event);
+         if (trace)
+            log.trace("Executed work " + this);  
       }
    }
 
-   public void rejected(long time, Throwable throwable)
+   /**
+    * Start
+    * @throws WorkException for any error 
+    */
+   protected void start() throws WorkException
    {
-      blockedTime = time;
-
       if (trace)
       {
-         if (throwable != null)
-            log.trace("Rejecting work " + this, throwable);
-         else
-            log.trace("Rejecting work " + this);
+         log.trace("Starting work " + this);  
       }
 
-      if (throwable != null)
+      ExecutionContext ctx = getExecutionContext();
+      
+      if (ctx != null)
       {
-         exception = new WorkRejectedException(throwable);
-         if (throwable instanceof StartTimeoutException)
-            exception.setErrorCode(WorkRejectedException.START_TIMED_OUT);
+         Xid xid = ctx.getXid();
+         if (xid != null)
+         {
+            //JBAS-4002 base value is in seconds as per the API, here we convert to millis
+            long timeout = (ctx.getTransactionTimeout() * 1000);
+            workManager.getXATerminator().registerWork(work, xid, timeout);
+         }
       }
       
-      workManager.cancelWork(this);
-      
-      if (workListener != null)
+      if (ctx != null)
       {
-         WorkEvent event = new WorkEvent(workManager, WorkEvent.WORK_ACCEPTED, work, exception);
-         workListener.workRejected(event);
+         Xid xid = ctx.getXid();
+         if (xid != null)
+         {
+            workManager.getXATerminator().startWork(work, xid);
+         }
       }
-   }
 
-   public void started(long time)
-   {
-      if (waitType != WAIT_NONE)
-         blockedTime = time;
-
       if (workListener != null)
       {
          WorkEvent event = new WorkEvent(workManager, WorkEvent.WORK_STARTED, work, null);
          workListener.workStarted(event);
       }
+
+      if (trace)
+      {
+         log.trace("Started work " + this);  
+      }
    }
 
-   public void completed(long time, Throwable throwable)
+   /**
+    * End
+    */
+   protected void end()
    {
-      if (waitType == WAIT_FOR_COMPLETE)
-         blockedTime = time;
+      if (trace)
+      {
+         log.trace("Ending work " + this);  
+      }
 
-      if (throwable != null)
-         exception = new WorkCompletedException(throwable);
+      ExecutionContext ctx = getExecutionContext();
 
+      if (ctx != null)
+      {
+         Xid xid = ctx.getXid();
+         if (xid != null)
+         {
+            workManager.getXATerminator().endWork(work, xid);
+         }
+      }
+
       if (trace)
-         log.trace("Completed work " + this);
+      {
+         log.trace("Ended work " + this);  
+      }
+   }
 
-      if (workListener != null)
+   /**
+    * Cancel
+    */
+   protected void cancel()
+   {
+      if (trace)
+         log.trace("Cancel work " + this);  
+
+      ExecutionContext ctx = getExecutionContext();
+
+      if (ctx != null)
       {
-         WorkEvent event = new WorkEvent(workManager, WorkEvent.WORK_COMPLETED, work, exception);
-         workListener.workCompleted(event);
+         Xid xid = ctx.getXid();
+         if (xid != null)
+         {
+            workManager.getXATerminator().cancelWork(work, xid);
+         }
       }
+
+      if (trace)
+         log.trace("Canceled work " + this);  
    }
    
+   /**
+    * String representation
+    * @return The string
+    */
    public String toString()
    {
-      JBossStringBuilder buffer = new JBossStringBuilder(100);
+      StringBuilder buffer = new StringBuilder(100);
       buffer.append("WorkWrapper@").append(Integer.toHexString(System.identityHashCode(this)));
       buffer.append("[workManger=").append(workManager);
       buffer.append(" work=").append(work);
-      buffer.append(" state=").append(getStateString());
+
       if (executionContext != null && executionContext.getXid() != null)
       {
          buffer.append(" xid=").append(executionContext.getXid());
          buffer.append(" txTimeout=").append(executionContext.getTransactionTimeout());
       }
-      buffer.append(" waitType=");
-      switch (waitType)
-      {
-         case WAIT_NONE:
-         {
-            buffer.append("WAIT_NONE");
-            break;
-         }
-         case WAIT_FOR_START:
-         {
-            buffer.append("WAIT_FOR_START");
-            break;
-         }
-         case WAIT_FOR_COMPLETE:
-         {
-            buffer.append("WAIT_FOR_COMPLETE");
-            break;
-         }
-         default:
-            buffer.append("???");
-      }
-      if (startTimeout != WorkManager.INDEFINITE)
-         buffer.append(" startTimeout=").append(startTimeout);
-      long completionTimeout = getCompletionTimeout();
-      if (completionTimeout != -1)
-         buffer.append(" completionTimeout=").append(completionTimeout);
-      if (blockedTime != 0)
-         buffer.append(" blockTime=").append(blockedTime);
-      buffer.append(" elapsedTime=").append(getElapsedTime());
+
       if (workListener != null)
          buffer.append(" workListener=").append(workListener);
       if (exception != null)




More information about the jboss-cvs-commits mailing list