[jboss-cvs] JBossAS SVN: r86969 - in trunk: system/src/main/org/jboss/system/server/profileservice/repository/clustered/local and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Apr 8 12:40:41 EDT 2009


Author: bstansberry at jboss.com
Date: 2009-04-08 12:40:41 -0400 (Wed, 08 Apr 2009)
New Revision: 86969

Added:
   trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java
   trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java
   trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/profileservice/test/ClusteredDeploymentRepoAddContentTestCase.java
Removed:
   trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/sync/StreamReadAction.java
Modified:
   trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/ClusteredDeploymentRepository.java
   trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/AbstractLocalContentManager.java
   trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FileWriteAction.java
   trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FilesystemLocalContentManager.java
Log:
[JBAS-5552] Sort issues with DeploymentRepository.addDeploymentContent(...)

Modified: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/ClusteredDeploymentRepository.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/ClusteredDeploymentRepository.java	2009-04-08 15:51:12 UTC (rev 86968)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/ClusteredDeploymentRepository.java	2009-04-08 16:40:41 UTC (rev 86969)
@@ -374,57 +374,54 @@
       // have a modification
       RepositoryContentMetadata baseContent = null;
       RepositoryContentMetadata latestContent = null;
-      boolean unmodified = false;
       if (!this.clusteringHandler.lockLocally())
       {
          // Don't throw an exception; just log error and indicate no mods
          log.error("getModifiedDeployments(): Cannot acquire local lock");
          return Collections.emptySet();
       }
+      
       try
       {
          baseContent = this.localContentManager.getOfficialContentMetadata();
          latestContent = this.localContentManager.getCurrentContentMetadata();         
-         unmodified = latestContent.equals(baseContent);
+         boolean unmodified = latestContent.equals(baseContent);
+         
+         if (unmodified)
+         {
+            // Our metadata is in sync, so our content is in sync with the
+            // cluster. However, we might have had changes made to our
+            // content (e.g. via addDeploymentContenxt(...)) that have not
+            // been deployed. So, check for those...
+            return createModificationInfo();
+         }
       }
       finally
       {
          this.clusteringHandler.unlockLocally();
       }
       
-      Collection<ModificationInfo> result = null;
-      if (unmodified)
+      
+      // If we got here, our content is out of sync with the cluster, so take 
+      // control of the cluster and bring the cluster in line with our current state      
+      if (!this.clusteringHandler.lockGlobally())
       {
-         // Done
-         result = Collections.emptySet();
+         // Don't throw an exception; just log error and indicate no mods
+         log.error("getModifiedDeployments(): Cannot acquire global lock");
+         return Collections.emptySet();
       }
-      else
-      {
-         // Something was modified, so take control of the cluster and
-         // bring the cluster in line with our current state
+      try
+      {      
+         // Tell the clustering handler to synchronize, but without
+         // pulling anything to cluster -- just push our changes
+         latestContent = this.clusteringHandler.synchronizeContent(false);
          
-         if (!this.clusteringHandler.lockGlobally())
-         {
-            // Don't throw an exception; just log error and indicate no mods
-            log.error("getModifiedDeployments(): Cannot acquire global lock");
-            return Collections.emptySet();
-         }
-         try
-         {      
-            // Tell the clustering handler to synchronize, but without
-            // pulling anything to cluster -- just push our changes
-            latestContent = this.clusteringHandler.synchronizeContent(false);
-            
-            return createModificationInfo();
-         }
-         finally
-         {
-            this.clusteringHandler.unlockGlobally();
-         }
+         return createModificationInfo();
       }
-      
-      return result;
-      
+      finally
+      {
+         this.clusteringHandler.unlockGlobally();
+      }      
    }
    
    public void unload()

Modified: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/AbstractLocalContentManager.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/AbstractLocalContentManager.java	2009-04-08 15:51:12 UTC (rev 86968)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/AbstractLocalContentManager.java	2009-04-08 16:40:41 UTC (rev 86969)
@@ -39,7 +39,6 @@
 import org.jboss.system.server.profileservice.repository.clustered.sync.ContentModification;
 import org.jboss.system.server.profileservice.repository.clustered.sync.NoOpSynchronizationAction;
 import org.jboss.system.server.profileservice.repository.clustered.sync.RemovalMetadataInsertionAction;
-import org.jboss.system.server.profileservice.repository.clustered.sync.StreamReadAction;
 import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationAction;
 import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationActionContext;
 import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationId;
@@ -379,7 +378,7 @@
          else if (rootName == null)
          {
             // Use the first root that can accept children
-            URI rootURI = namedURIMap.get(rootName);
+            URI rootURI = namedURIMap.get(rmd.getName());
             VirtualFile vf = getCachedVirtualFile(rootURI);
             if (isDirectory(vf))
             {
@@ -407,15 +406,19 @@
          throw new IllegalArgumentException("Unknown root name " + toAdd.getRootName());
       }
       RepositoryItemMetadata remove = rmd.getItemMetadata(toAdd.getRelativePathElements());
-      if (remove.isDirectory())
+      if (remove != null)
       {
-         for (RepositoryItemMetadata rim : rmd.getContent())
+         if (remove.isDirectory())
          {
-            if (rim.isChildOf(remove))
+            for (RepositoryItemMetadata rim : rmd.getContent())
             {
-               rmd.removeItemMetadata(rim.getRelativePathElements());
+               if (rim.isChildOf(remove))
+               {
+                  rmd.removeItemMetadata(rim.getRelativePathElements());
+               }
             }
          }
+         rmd.removeItemMetadata(remove.getRelativePathElements());
       }
       rmd.addItemMetadata(toAdd);
       pendingStreams.put(toAdd, contentIS);      
@@ -426,13 +429,15 @@
    {
       URI uri = namedURIMap.get(item.getRootName());
       VirtualFile vf = getCachedVirtualFile(uri);
+      VirtualFile parent = null;
       List<String> path = item.getRelativePathElements();
       for (String element : path)
       {
-         vf = vf.getChild(element);
+         parent = vf;
+         vf = parent.getChild(element);
          if (vf == null)
          {
-            throw new IllegalStateException("No child " + element + " under " + vf);
+            throw new IllegalStateException("No child " + element + " under " + parent);
          }
       }
       return vf;
@@ -465,18 +470,21 @@
       }
       
       RepositoryItemMetadata remove = root.getItemMetadata(path);
-      if (isDirectory(vf))
+      if (remove != null)
       {
-         for (RepositoryItemMetadata rim : root.getContent())
+         if (isDirectory(vf))
          {
-            if (rim.isChildOf(remove))
+            for (RepositoryItemMetadata rim : root.getContent())
             {
-               root.removeItemMetadata(rim.getRelativePathElements());
+               if (rim.isChildOf(remove))
+               {
+                  root.removeItemMetadata(rim.getRelativePathElements());
+               }
             }
+            
          }
-         
+         root.removeItemMetadata(path);
       }
-      root.removeItemMetadata(path);
       return cmd;
    }  
    
@@ -489,6 +497,8 @@
    protected abstract TwoPhaseCommitAction<T> createPullFromClusterAction(ContentModification mod, boolean localLed);
    
    protected abstract TwoPhaseCommitAction<T> createPushToClusterAction(ContentModification mod, boolean localLed);
+
+   protected abstract TwoPhaseCommitAction<T> createPushStreamToClusterAction(ContentModification mod, InputStream stream);
    
    protected abstract TwoPhaseCommitAction<T> createRemoveFromClusterAction(ContentModification mod, boolean localLed);
    
@@ -599,11 +609,6 @@
       }
    }
 
-   private TwoPhaseCommitAction<T> createPushStreamToClusterAction(ContentModification mod, InputStream stream)
-   {
-      return new StreamReadAction<T>(stream, getSynchronizationActionContext(), mod);
-   }
-
    private void validateSynchronization(SynchronizationId<?> id)
    {
       if (id == null)

Added: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java	                        (rev 0)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java	2009-04-08 16:40:41 UTC (rev 86969)
@@ -0,0 +1,153 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.system.server.profileservice.repository.clustered.local.file;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.jboss.system.server.profileservice.repository.clustered.sync.ByteChunk;
+import org.jboss.system.server.profileservice.repository.clustered.sync.ContentModification;
+
+/**
+ * Base class for actions that write to a {@link File}.
+ *
+ * @author Brian Stansberry
+ * 
+ * @version $Revision: $
+ */
+public abstract class AbstractFileWriteAction extends AbstractLocalContentChangeAction
+{
+   
+   private File tempFile;
+   private OutputStream stream;
+
+   /**
+    * Create a new FileWriteAction.
+    * 
+    * @param targetFile the file to write to
+    * @param context the overall context of the modification
+    * @param modification the modification
+    */
+   public AbstractFileWriteAction(File targetFile, FileBasedSynchronizationActionContext context, 
+         ContentModification modification)
+   {
+      super(targetFile, context, modification);
+   }
+
+   // --------------------------------------------------------------  Protected
+   
+   protected void writeBytes(ByteChunk bytes) throws IOException
+   {
+      if (bytes == null)
+      {
+         throw new IllegalArgumentException("Null bytes");
+      }
+      if (bytes.getByteCount() < 0)
+      {
+         throw new IllegalArgumentException("Illegal byte count " + bytes.getByteCount());
+      }
+      OutputStream os = getOutputStream();
+      os.write(bytes.getBytes(), 0, bytes.getByteCount());
+   }
+
+   @Override
+   protected void doComplete() throws Exception
+   {
+      // Done writing
+      safeCloseStream();
+      super.doComplete();
+   }
+   
+   @Override
+   protected boolean modifyTarget() throws IOException
+   {
+      // Our temp file replaces targetFile
+      FileUtil.localMove(tempFile, getTargetFile(), getRepositoryContentModification().getItem().getTimestamp());
+      return true;
+   }
+
+   protected synchronized void safeCleanup(boolean cleanRollback)
+   {
+      super.safeCleanup(cleanRollback);
+      safeCloseStream();
+      if (tempFile != null)
+      {
+         tempFile.delete();
+      }    
+   }
+   
+   // ----------------------------------------------------------------  Private
+   
+   private synchronized OutputStream getOutputStream() throws IOException
+   {
+      State s = getState();
+      if (s != State.OPEN && s != State.CANCELLED)
+      {
+         throw new IllegalStateException("Cannot write when state is " + s);
+      }
+      
+      if (stream == null)
+      {
+         FileOutputStream fos = new FileOutputStream(getTempFile());
+         stream = new BufferedOutputStream(fos);
+      }
+      return stream;
+   }
+   
+   private File getTempFile() throws IOException
+   {
+      if (tempFile == null)
+      {
+         tempFile = createTempFile();
+      }
+      return tempFile;
+   }
+   
+   private synchronized void safeCloseStream()
+   {
+      if (stream != null)
+      {
+         synchronized (stream)
+         {
+            try
+            {
+               stream.close();
+            }
+            catch (IOException e)
+            {
+               ContentModification mod = getRepositoryContentModification();
+               getLogger().debug("Caught exception closing stream for " + mod.getRootName() + 
+                     " " + mod.getItem().getRelativePath(), e);
+            }
+            finally
+            {
+               stream = null;
+            }
+         }
+      }
+   }
+
+}


Property changes on: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java
___________________________________________________________________
Name: svn:keywords
   + 

Added: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java	                        (rev 0)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java	2009-04-08 16:40:41 UTC (rev 86969)
@@ -0,0 +1,226 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.system.server.profileservice.repository.clustered.local.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.jboss.logging.Logger;
+import org.jboss.system.server.profileservice.repository.clustered.sync.ByteChunk;
+import org.jboss.system.server.profileservice.repository.clustered.sync.ContentModification;
+import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationReadAction;
+
+/**
+ * {@link SynchronizationReadAction} that reads from a {@link InputStream}
+ * and besides returning {@link ByteChunk}s, also writes the stream contents
+ * to a local file. Used for pushing content from a stream to both the cluster
+ * and the local filesystem.
+ *
+ * @author Brian Stansberry
+ * 
+ * @version $Revision: $
+ */
+public class AddContentStreamAction extends AbstractFileWriteAction
+      implements SynchronizationReadAction<FileBasedSynchronizationActionContext>
+{
+   private static final Logger log = Logger.getLogger(AddContentStreamAction.class);
+   
+   /** 
+    * Max file transfer buffer size that we read at a time.
+    * This influences the number of times that we will invoke disk read/write file
+    * operations versus how much memory we will consume for a file transfer. 
+    */
+   public static final int MAX_CHUNK_BUFFER_SIZE = 512 * 1024;
+   
+   private final InputStream stream;
+   
+   /**
+    * Create a new StreamReadAction.
+    * 
+    * @param stream the stream to read
+    * @param context the overall context of the modification
+    * @param modification the modification
+    */
+   public AddContentStreamAction(InputStream stream, File targetFile, 
+                           FileBasedSynchronizationActionContext context, 
+                           ContentModification modification)
+   {
+      super(targetFile, context, modification);
+      if (stream == null)
+      {
+         throw new IllegalArgumentException("Null stream");
+      }
+      this.stream = stream;
+   }
+   
+   // ------------------------------------  RepositorySynchronizationReadAction
+
+   public ByteChunk getNextBytes() throws IOException
+   {
+      InputStream is = getInputStream();
+      byte[] b = null;
+      int read = -1;
+      synchronized (is)
+      {
+         b = new byte[MAX_CHUNK_BUFFER_SIZE];
+         read = is.read(b);
+      }
+      ByteChunk byteChunk = new ByteChunk(b, read);
+      
+      // Write the bytes to our temp file as well
+      if (byteChunk.getByteCount() > -1)
+      {
+         writeBytes(byteChunk);
+      }
+      
+      return byteChunk;
+   }
+   
+   // --------------------------------------------------------------  Protected
+   
+   @Override
+   protected Logger getLogger()
+   {
+      return log;
+   }
+
+   @Override
+   protected void doCancel()
+   {
+      try
+      {
+         super.doCancel();
+      }
+      finally
+      {
+         safeCloseStream();
+      }
+   }
+
+   @Override
+   protected void doCommit()
+   {
+      try
+      {
+         super.doCommit();
+      }
+      finally
+      {
+         safeCloseStream();
+      }
+   }
+
+   @Override
+   protected void doComplete() throws Exception
+   {
+      try
+      {
+         super.doComplete();
+      }
+      finally
+      {
+         safeCloseStream();
+      }
+   }
+
+   @Override
+   protected boolean doPrepare()
+   {
+      try
+      {
+         return super.doPrepare();
+      }
+      finally
+      {
+         safeCloseStream();
+      }
+   }
+
+   @Override
+   protected void doRollbackFromCancelled()
+   {
+      try
+      {
+         super.doRollbackFromCancelled();
+      }
+      finally
+      {
+         safeCloseStream();
+      }
+   }
+
+   @Override
+   protected void doRollbackFromOpen()
+   {
+      try
+      {
+         super.doRollbackFromOpen();
+      }
+      finally
+      {
+         safeCloseStream();
+      }
+   }
+
+   @Override
+   protected void doRollbackFromRollbackOnly()
+   {
+      try
+      {
+         super.doRollbackFromRollbackOnly();
+      }
+      finally
+      {
+         safeCloseStream();
+      }
+   }
+
+   private synchronized InputStream getInputStream() throws IOException
+   {
+      State s = getState();
+      if (s != State.OPEN && s != State.CANCELLED)
+      {
+         throw new IllegalStateException("Cannot read when state is " + s);
+      }
+      return stream;
+   }
+   
+   private synchronized void safeCloseStream()
+   {
+      synchronized (stream)
+      {
+         try
+         {
+            stream.close();
+         }
+         catch (IOException e)
+         {
+            ContentModification mod = getRepositoryContentModification();
+            log.debug("Caught exception closing stream for " + mod.getRootName() + 
+                  " " + mod.getItem().getRelativePath(), e);
+         }
+      }
+   }
+
+}


Property changes on: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java
___________________________________________________________________
Name: svn:keywords
   + 

Modified: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FileWriteAction.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FileWriteAction.java	2009-04-08 15:51:12 UTC (rev 86968)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FileWriteAction.java	2009-04-08 16:40:41 UTC (rev 86969)
@@ -22,32 +22,25 @@
 
 package org.jboss.system.server.profileservice.repository.clustered.local.file;
 
-import java.io.BufferedOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 
 import org.jboss.logging.Logger;
 import org.jboss.system.server.profileservice.repository.clustered.sync.ByteChunk;
 import org.jboss.system.server.profileservice.repository.clustered.sync.ContentModification;
-import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationReadAction;
 import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationWriteAction;
 
 /**
- * {@link SynchronizationReadAction} that writes to a {@link File}.
+ * {@link SynchronizationWriteAction} that writes to a {@link File}.
  *
  * @author Brian Stansberry
  * 
  * @version $Revision: $
  */
-public class FileWriteAction extends AbstractLocalContentChangeAction
+public class FileWriteAction extends AbstractFileWriteAction
       implements SynchronizationWriteAction<FileBasedSynchronizationActionContext>
 {
    private static final Logger log = Logger.getLogger(FileWriteAction.class);
-   
-   private File tempFile;
-   private OutputStream stream;
 
    /**
     * Create a new FileWriteAction.
@@ -66,102 +59,15 @@
    
    public void writeBytes(ByteChunk bytes) throws IOException
    {
-      if (bytes == null)
-      {
-         throw new IllegalArgumentException("Null bytes");
-      }
-      if (bytes.getByteCount() < 0)
-      {
-         throw new IllegalArgumentException("Illegal byte count " + bytes.getByteCount());
-      }
-      OutputStream os = getOutputStream();
-      os.write(bytes.getBytes(), 0, bytes.getByteCount());
+      super.writeBytes(bytes);
    }
 
    // --------------------------------------------------------------  Protected
-
-
-   @Override
-   protected void doComplete() throws Exception
-   {
-      // Done writing
-      safeCloseStream();
-      super.doComplete();
-   }
    
    @Override
-   protected boolean modifyTarget() throws IOException
-   {
-      // Our temp file replaces targetFile
-      FileUtil.localMove(tempFile, getTargetFile(), getRepositoryContentModification().getItem().getTimestamp());
-      return true;
-   }
-   
-   @Override
    protected Logger getLogger()
    {
       return log;
    }
 
-   protected synchronized void safeCleanup(boolean cleanRollback)
-   {
-      super.safeCleanup(cleanRollback);
-      safeCloseStream();
-      if (tempFile != null)
-      {
-         tempFile.delete();
-      }    
-   }
-   
-   // ----------------------------------------------------------------  Private
-   
-   private synchronized OutputStream getOutputStream() throws IOException
-   {
-      State s = getState();
-      if (s != State.OPEN && s != State.CANCELLED)
-      {
-         throw new IllegalStateException("Cannot write when state is " + s);
-      }
-      
-      if (stream == null)
-      {
-         FileOutputStream fos = new FileOutputStream(getTempFile());
-         stream = new BufferedOutputStream(fos);
-      }
-      return stream;
-   }
-   
-   private File getTempFile() throws IOException
-   {
-      if (tempFile == null)
-      {
-         tempFile = createTempFile();
-      }
-      return tempFile;
-   }
-   
-   private synchronized void safeCloseStream()
-   {
-      if (stream != null)
-      {
-         synchronized (stream)
-         {
-            try
-            {
-               stream.close();
-            }
-            catch (IOException e)
-            {
-               ContentModification mod = getRepositoryContentModification();
-               log.debug("Caught exception closing stream for " + mod.getRootName() + 
-                     " " + mod.getItem().getRelativePath(), e);
-            }
-            finally
-            {
-               stream = null;
-            }
-         }
-      }
-   }
-
 }

Modified: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FilesystemLocalContentManager.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FilesystemLocalContentManager.java	2009-04-08 15:51:12 UTC (rev 86968)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FilesystemLocalContentManager.java	2009-04-08 16:40:41 UTC (rev 86969)
@@ -23,6 +23,7 @@
 package org.jboss.system.server.profileservice.repository.clustered.local.file;
 
 import java.io.File;
+import java.io.InputStream;
 import java.net.URI;
 import java.util.Map;
 
@@ -113,6 +114,13 @@
    }
 
    @Override
+   protected TwoPhaseCommitAction<FileBasedSynchronizationActionContext> createPushStreamToClusterAction(ContentModification mod, InputStream stream)
+   {
+      File targetFile = FileUtil.getFileForItem(getRootURIForModification(mod), mod.getItem());
+      return new AddContentStreamAction(stream, targetFile, getSynchronizationActionContext(), mod);
+   }
+
+   @Override
    protected TwoPhaseCommitAction<FileBasedSynchronizationActionContext> createRemoveFromClusterAction(
          ContentModification mod, boolean localLed)
    {

Deleted: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/sync/StreamReadAction.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/sync/StreamReadAction.java	2009-04-08 15:51:12 UTC (rev 86968)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/sync/StreamReadAction.java	2009-04-08 16:40:41 UTC (rev 86969)
@@ -1,169 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.system.server.profileservice.repository.clustered.sync;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.jboss.logging.Logger;
-
-/**
- * {@link SynchronizationReadAction} that reads from a {@link File}.
- *
- * @author Brian Stansberry
- * 
- * @version $Revision: $
- */
-public class StreamReadAction<T extends SynchronizationActionContext> extends AbstractSynchronizationAction<T>
-      implements SynchronizationReadAction<T>
-{
-   private static final Logger log = Logger.getLogger(StreamReadAction.class);
-   
-   /** 
-    * Max file transfer buffer size that we read at a time.
-    * This influences the number of times that we will invoke disk read/write file
-    * operations versus how much memory we will consume for a file transfer. 
-    */
-   public static final int MAX_CHUNK_BUFFER_SIZE = 512 * 1024;
-   
-   private final InputStream stream;
-   
-   /**
-    * Create a new StreamReadAction.
-    * 
-    * @param stream the stream to read
-    * @param context the overall context of the modification
-    * @param modification the modification
-    */
-   public StreamReadAction(InputStream stream, T context, 
-                         ContentModification modification)
-   {
-      super(context, modification);
-      if (stream == null)
-      {
-         throw new IllegalArgumentException("Null stream");
-      }
-      this.stream = stream;
-   }
-   
-   // ------------------------------------  RepositorySynchronizationReadAction
-
-   public ByteChunk getNextBytes() throws IOException
-   {
-      InputStream is = getInputStream();
-      byte[] b = null;
-      int read = -1;
-      synchronized (is)
-      {
-         b = new byte[MAX_CHUNK_BUFFER_SIZE];
-         read = is.read(b);
-      }
-      return new ByteChunk(b, read);
-   }
-   
-   // --------------------------------------------------------------  Protected
-
-   @Override
-   protected void doCancel()
-   {
-      safeCloseStream();
-   }
-
-   @Override
-   protected void doCommit()
-   {
-      safeCloseStream();
-   }
-
-   @Override
-   protected void doComplete() throws Exception
-   {
-      safeCloseStream();
-   }
-
-   @Override
-   protected boolean doPrepare()
-   {
-      safeCloseStream();
-      return true;
-   }
-
-   @Override
-   protected void doRollbackFromCancelled()
-   {
-      safeCloseStream();
-   }
-
-   @Override
-   protected void doRollbackFromComplete()
-   {
-      safeCloseStream();
-   }
-
-   @Override
-   protected void doRollbackFromOpen()
-   {
-      safeCloseStream();
-   }
-
-   @Override
-   protected void doRollbackFromPrepared()
-   {
-      safeCloseStream();
-   }
-
-   @Override
-   protected void doRollbackFromRollbackOnly()
-   {
-      safeCloseStream();
-   }
-
-   private synchronized InputStream getInputStream() throws IOException
-   {
-      State s = getState();
-      if (s != State.OPEN && s != State.CANCELLED)
-      {
-         throw new IllegalStateException("Cannot read when state is " + s);
-      }
-      return stream;
-   }
-   
-   private synchronized void safeCloseStream()
-   {
-      synchronized (stream)
-      {
-         try
-         {
-            stream.close();
-         }
-         catch (IOException e)
-         {
-            ContentModification mod = getRepositoryContentModification();
-            log.debug("Caught exception closing stream for " + mod.getRootName() + 
-                  " " + mod.getItem().getRelativePath(), e);
-         }
-      }
-   }
-
-}

Added: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/profileservice/test/ClusteredDeploymentRepoAddContentTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/profileservice/test/ClusteredDeploymentRepoAddContentTestCase.java	                        (rev 0)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/profileservice/test/ClusteredDeploymentRepoAddContentTestCase.java	2009-04-08 16:40:41 UTC (rev 86969)
@@ -0,0 +1,277 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.cluster.defaultcfg.profileservice.test;
+
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+import org.jboss.deployers.spi.management.ManagementView;
+import org.jboss.deployers.spi.management.deploy.DeploymentManager;
+import org.jboss.deployers.spi.management.deploy.DeploymentProgress;
+import org.jboss.deployers.spi.management.deploy.DeploymentStatus;
+import org.jboss.deployers.spi.management.deploy.ProgressEvent;
+import org.jboss.deployers.spi.management.deploy.ProgressListener;
+import org.jboss.managed.api.ManagedDeployment;
+import org.jboss.profileservice.spi.ProfileKey;
+import org.jboss.profileservice.spi.ProfileService;
+import org.jboss.test.JBossClusteredTestCase;
+import org.jboss.virtual.VFS;
+
+/**
+ *
+ *
+ * @author Brian Stansberry
+ * 
+ * @version $Revision: $
+ */
+public class ClusteredDeploymentRepoAddContentTestCase 
+      extends JBossClusteredTestCase implements ProgressListener
+{   
+   /** We use the default profile, defined by DeploymentManager to deploy apps. */
+   public static final ProfileKey farmProfile = new ProfileKey("farm");
+   
+   protected ManagementView activeView;
+   protected DeploymentManager deployMgr;
+   private long eventCount = 0;
+
+   /**
+    * Create a new ClusteredDeploymentRepoAddContentTestCase.
+    * 
+    * @param name
+    */
+   public ClusteredDeploymentRepoAddContentTestCase(String name)
+   {
+      super(name);
+   }
+
+   public void testDeployment() throws Exception
+   {
+      String name = "farm-addedcontent-service.xml";
+      ManagedDeploymentTester tester = new ManagedDeploymentTester()
+      {         
+         public void testManagedDeployment() throws Exception
+         {
+            boolean node0OK = false;
+            boolean node1OK = false;
+            
+            MBeanServerConnection[] adaptors = getAdaptors();
+            ObjectName oname = new ObjectName("jboss.system:service=FarmAddContentTestThreadPool");
+            
+            long deadline = System.currentTimeMillis() + 12000;
+            do
+            {
+               if (!node0OK)
+               {
+                  try
+                  {
+                     node0OK = "FarmAddContentThreadPool".equals(adaptors[0].getAttribute(oname, "Name"));
+                  }
+                  catch (Exception ignored) {}
+               }
+               if (!node1OK)
+               {
+                  try
+                  {
+                     node1OK = "FarmAddContentThreadPool".equals(adaptors[1].getAttribute(oname, "Name"));
+                  }
+                  catch (Exception ignored) {}                  
+               }
+               
+               if (node0OK && node1OK)
+               {
+                  break;
+               }
+               
+               Thread.sleep(200);
+            }
+            while (System.currentTimeMillis() < deadline);
+            
+            assertTrue("node0 OK", node0OK);
+            assertTrue("node1 OK", node1OK);
+         }
+         
+      };
+      testDeployment(name, "sar", tester);
+//      testDeployment(name, "sar", null);
+   }
+   
+   protected void testDeployment(String name, String type, ManagedDeploymentTester tester) throws Exception
+   {
+      DeploymentManager deployMgr = getDeploymentManager(getNamingContext(0));
+      URL contentURL = getDeployURL(name);
+      assertNotNull(contentURL);
+      getLog().debug(contentURL);
+      // TODO - hack to get off JDK's url handling
+      String urlString = contentURL.toExternalForm();
+      int p = urlString.indexOf(":/");
+      contentURL = new URL("vfszip" + urlString.substring(p));
+      getLog().debug(contentURL);
+
+      DeploymentStatus status;
+      DeploymentProgress progress = deployMgr.distribute(name, contentURL, true);
+      progress.addProgressListener(this);
+      progress.run();
+      String[] uploadedNames = {};
+      try
+      {
+         status = progress.getDeploymentStatus();
+         assertTrue("DeploymentStatus.isCompleted: " + status, status.isCompleted());
+         // It should not be running yet
+         assertFalse("DeploymentStatus.isRunning: " + status, status.isRunning());
+         assertFalse("DeploymentStatus.isFailed: " + status, status.isFailed());
+
+         // Get the unique deployment name
+         uploadedNames = progress.getDeploymentID().getRepositoryNames();
+         getLog().debug("Uploaded deployment names: "+Arrays.asList(uploadedNames));
+         
+         // Now start the deployment
+         progress = deployMgr.start(uploadedNames);
+         progress.addProgressListener(this);
+         progress.run();
+         try
+         {
+            status = progress.getDeploymentStatus();
+            assertTrue("DeploymentStatus.isCompleted: " + status, status.isCompleted());
+            assertFalse("DeploymentStatus.isRunning: " + status, status.isRunning());
+            assertFalse("DeploymentStatus.isFailed: " + status, status.isFailed());
+            // Check for a
+            ManagementView mgtView = getManagementView(getNamingContext(0));
+            ManagedDeployment deployment = mgtView.getDeployment(uploadedNames[0]);
+            assertNotNull(deployment);
+            getLog().info("Found " + type + " deployment: " + deployment);
+            Set<String> types = deployment.getTypes();
+            if (types != null && types.isEmpty() == false)
+               assertTrue("Missing type: " + type + ", available: " + types, types.contains(type));
+            if (tester != null)
+            {
+               tester.testManagedDeployment();
+            }
+         }
+         finally
+         {
+            //Thread.sleep(15 * 1000); // 15 secs >> more than it takes for reaper to run :-)
+
+            // Stop/remove the deployment
+            progress = deployMgr.stop(uploadedNames);
+            progress.addProgressListener(this);
+            progress.run();
+            status = progress.getDeploymentStatus();
+            assertTrue("DeploymentStatus.isCompleted: " + status, status.isCompleted());
+            assertFalse("DeploymentStatus.isFailed: " + status, status.isFailed());
+         }
+      }
+      finally
+      {
+         progress = deployMgr.remove(uploadedNames);
+         progress.addProgressListener(this);
+         progress.run();
+         status = progress.getDeploymentStatus();
+         assertTrue("DeploymentStatus.isCompleted: " + status, status.isCompleted());
+         assertFalse("DeploymentStatus.isFailed: " + status, status.isFailed());
+      }
+   }
+   /**
+    * Obtain the ProfileService.ManagementView
+    * @return
+    * @throws Exception
+    */
+   protected DeploymentManager getDeploymentManager(Context ctx)
+      throws Exception
+   {
+      if( deployMgr == null )
+      {
+         ProfileService ps = (ProfileService) ctx.lookup("ProfileService");
+         deployMgr = ps.getDeploymentManager();
+         deployMgr.loadProfile(getProfileKey());
+         // Init the VFS to setup the vfs* protocol handlers
+         VFS.init();
+      }
+      return deployMgr;
+   }
+
+   /**
+    * Obtain the ProfileService.ManagementView
+    * @return
+    * @throws Exception
+    */
+   protected ManagementView getManagementView(Context ctx)
+      throws Exception
+   {
+      if( activeView == null )
+      {
+         ProfileService ps = (ProfileService) ctx.lookup("ProfileService");
+         activeView = ps.getViewManager();
+         // Init the VFS to setup the vfs* protocol handlers
+         VFS.init();
+      }
+      // Reload
+      activeView.load();
+      return activeView;
+   }
+   
+   protected Context getNamingContext(int nodeIndex) throws Exception
+   {
+      // Connect to the server0 JNDI
+      String[] urls = getNamingURLs();
+      Properties env1 = new Properties();
+      env1.setProperty(Context.INITIAL_CONTEXT_FACTORY,
+         "org.jnp.interfaces.NamingContextFactory");
+      env1.setProperty(Context.PROVIDER_URL, urls[nodeIndex]);
+      return new InitialContext(env1);
+   }
+   
+   protected ProfileKey getProfileKey()
+   {
+      if(getProfileName() == null)
+         return farmProfile;
+      
+      return new ProfileKey(getProfileName());
+   }
+   
+   /**
+    * @return the ProfileKey.name to use when loading the profile
+    */
+   protected String getProfileName()
+   {
+      return null;
+   }
+
+   private interface ManagedDeploymentTester
+   {
+      void testManagedDeployment() throws Exception;
+   }
+
+   public void progressEvent(ProgressEvent eventInfo)
+   {
+      eventCount ++;
+      getLog().debug(eventInfo);
+   }
+
+}


Property changes on: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/profileservice/test/ClusteredDeploymentRepoAddContentTestCase.java
___________________________________________________________________
Name: svn:keywords
   + 




More information about the jboss-cvs-commits mailing list