[jboss-cvs] JBossAS SVN: r86969 - in trunk: system/src/main/org/jboss/system/server/profileservice/repository/clustered/local and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Apr 8 12:40:41 EDT 2009
Author: bstansberry at jboss.com
Date: 2009-04-08 12:40:41 -0400 (Wed, 08 Apr 2009)
New Revision: 86969
Added:
trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java
trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java
trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/profileservice/test/ClusteredDeploymentRepoAddContentTestCase.java
Removed:
trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/sync/StreamReadAction.java
Modified:
trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/ClusteredDeploymentRepository.java
trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/AbstractLocalContentManager.java
trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FileWriteAction.java
trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FilesystemLocalContentManager.java
Log:
[JBAS-5552] Sort issues with DeploymentRepository.addDeploymentContent(...)
Modified: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/ClusteredDeploymentRepository.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/ClusteredDeploymentRepository.java 2009-04-08 15:51:12 UTC (rev 86968)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/ClusteredDeploymentRepository.java 2009-04-08 16:40:41 UTC (rev 86969)
@@ -374,57 +374,54 @@
// have a modification
RepositoryContentMetadata baseContent = null;
RepositoryContentMetadata latestContent = null;
- boolean unmodified = false;
if (!this.clusteringHandler.lockLocally())
{
// Don't throw an exception; just log error and indicate no mods
log.error("getModifiedDeployments(): Cannot acquire local lock");
return Collections.emptySet();
}
+
try
{
baseContent = this.localContentManager.getOfficialContentMetadata();
latestContent = this.localContentManager.getCurrentContentMetadata();
- unmodified = latestContent.equals(baseContent);
+ boolean unmodified = latestContent.equals(baseContent);
+
+ if (unmodified)
+ {
+ // Our metadata is in sync, so our content is in sync with the
+ // cluster. However, we might have had changes made to our
+ // content (e.g. via addDeploymentContenxt(...)) that have not
+ // been deployed. So, check for those...
+ return createModificationInfo();
+ }
}
finally
{
this.clusteringHandler.unlockLocally();
}
- Collection<ModificationInfo> result = null;
- if (unmodified)
+
+ // If we got here, our content is out of sync with the cluster, so take
+ // control of the cluster and bring the cluster in line with our current state
+ if (!this.clusteringHandler.lockGlobally())
{
- // Done
- result = Collections.emptySet();
+ // Don't throw an exception; just log error and indicate no mods
+ log.error("getModifiedDeployments(): Cannot acquire global lock");
+ return Collections.emptySet();
}
- else
- {
- // Something was modified, so take control of the cluster and
- // bring the cluster in line with our current state
+ try
+ {
+ // Tell the clustering handler to synchronize, but without
+ // pulling anything to cluster -- just push our changes
+ latestContent = this.clusteringHandler.synchronizeContent(false);
- if (!this.clusteringHandler.lockGlobally())
- {
- // Don't throw an exception; just log error and indicate no mods
- log.error("getModifiedDeployments(): Cannot acquire global lock");
- return Collections.emptySet();
- }
- try
- {
- // Tell the clustering handler to synchronize, but without
- // pulling anything to cluster -- just push our changes
- latestContent = this.clusteringHandler.synchronizeContent(false);
-
- return createModificationInfo();
- }
- finally
- {
- this.clusteringHandler.unlockGlobally();
- }
+ return createModificationInfo();
}
-
- return result;
-
+ finally
+ {
+ this.clusteringHandler.unlockGlobally();
+ }
}
public void unload()
Modified: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/AbstractLocalContentManager.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/AbstractLocalContentManager.java 2009-04-08 15:51:12 UTC (rev 86968)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/AbstractLocalContentManager.java 2009-04-08 16:40:41 UTC (rev 86969)
@@ -39,7 +39,6 @@
import org.jboss.system.server.profileservice.repository.clustered.sync.ContentModification;
import org.jboss.system.server.profileservice.repository.clustered.sync.NoOpSynchronizationAction;
import org.jboss.system.server.profileservice.repository.clustered.sync.RemovalMetadataInsertionAction;
-import org.jboss.system.server.profileservice.repository.clustered.sync.StreamReadAction;
import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationAction;
import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationActionContext;
import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationId;
@@ -379,7 +378,7 @@
else if (rootName == null)
{
// Use the first root that can accept children
- URI rootURI = namedURIMap.get(rootName);
+ URI rootURI = namedURIMap.get(rmd.getName());
VirtualFile vf = getCachedVirtualFile(rootURI);
if (isDirectory(vf))
{
@@ -407,15 +406,19 @@
throw new IllegalArgumentException("Unknown root name " + toAdd.getRootName());
}
RepositoryItemMetadata remove = rmd.getItemMetadata(toAdd.getRelativePathElements());
- if (remove.isDirectory())
+ if (remove != null)
{
- for (RepositoryItemMetadata rim : rmd.getContent())
+ if (remove.isDirectory())
{
- if (rim.isChildOf(remove))
+ for (RepositoryItemMetadata rim : rmd.getContent())
{
- rmd.removeItemMetadata(rim.getRelativePathElements());
+ if (rim.isChildOf(remove))
+ {
+ rmd.removeItemMetadata(rim.getRelativePathElements());
+ }
}
}
+ rmd.removeItemMetadata(remove.getRelativePathElements());
}
rmd.addItemMetadata(toAdd);
pendingStreams.put(toAdd, contentIS);
@@ -426,13 +429,15 @@
{
URI uri = namedURIMap.get(item.getRootName());
VirtualFile vf = getCachedVirtualFile(uri);
+ VirtualFile parent = null;
List<String> path = item.getRelativePathElements();
for (String element : path)
{
- vf = vf.getChild(element);
+ parent = vf;
+ vf = parent.getChild(element);
if (vf == null)
{
- throw new IllegalStateException("No child " + element + " under " + vf);
+ throw new IllegalStateException("No child " + element + " under " + parent);
}
}
return vf;
@@ -465,18 +470,21 @@
}
RepositoryItemMetadata remove = root.getItemMetadata(path);
- if (isDirectory(vf))
+ if (remove != null)
{
- for (RepositoryItemMetadata rim : root.getContent())
+ if (isDirectory(vf))
{
- if (rim.isChildOf(remove))
+ for (RepositoryItemMetadata rim : root.getContent())
{
- root.removeItemMetadata(rim.getRelativePathElements());
+ if (rim.isChildOf(remove))
+ {
+ root.removeItemMetadata(rim.getRelativePathElements());
+ }
}
+
}
-
+ root.removeItemMetadata(path);
}
- root.removeItemMetadata(path);
return cmd;
}
@@ -489,6 +497,8 @@
protected abstract TwoPhaseCommitAction<T> createPullFromClusterAction(ContentModification mod, boolean localLed);
protected abstract TwoPhaseCommitAction<T> createPushToClusterAction(ContentModification mod, boolean localLed);
+
+ protected abstract TwoPhaseCommitAction<T> createPushStreamToClusterAction(ContentModification mod, InputStream stream);
protected abstract TwoPhaseCommitAction<T> createRemoveFromClusterAction(ContentModification mod, boolean localLed);
@@ -599,11 +609,6 @@
}
}
- private TwoPhaseCommitAction<T> createPushStreamToClusterAction(ContentModification mod, InputStream stream)
- {
- return new StreamReadAction<T>(stream, getSynchronizationActionContext(), mod);
- }
-
private void validateSynchronization(SynchronizationId<?> id)
{
if (id == null)
Added: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java (rev 0)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java 2009-04-08 16:40:41 UTC (rev 86969)
@@ -0,0 +1,153 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.system.server.profileservice.repository.clustered.local.file;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.jboss.system.server.profileservice.repository.clustered.sync.ByteChunk;
+import org.jboss.system.server.profileservice.repository.clustered.sync.ContentModification;
+
+/**
+ * Base class for actions that write to a {@link File}.
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision: $
+ */
+public abstract class AbstractFileWriteAction extends AbstractLocalContentChangeAction
+{
+
+ private File tempFile;
+ private OutputStream stream;
+
+ /**
+ * Create a new FileWriteAction.
+ *
+ * @param targetFile the file to write to
+ * @param context the overall context of the modification
+ * @param modification the modification
+ */
+ public AbstractFileWriteAction(File targetFile, FileBasedSynchronizationActionContext context,
+ ContentModification modification)
+ {
+ super(targetFile, context, modification);
+ }
+
+ // -------------------------------------------------------------- Protected
+
+ protected void writeBytes(ByteChunk bytes) throws IOException
+ {
+ if (bytes == null)
+ {
+ throw new IllegalArgumentException("Null bytes");
+ }
+ if (bytes.getByteCount() < 0)
+ {
+ throw new IllegalArgumentException("Illegal byte count " + bytes.getByteCount());
+ }
+ OutputStream os = getOutputStream();
+ os.write(bytes.getBytes(), 0, bytes.getByteCount());
+ }
+
+ @Override
+ protected void doComplete() throws Exception
+ {
+ // Done writing
+ safeCloseStream();
+ super.doComplete();
+ }
+
+ @Override
+ protected boolean modifyTarget() throws IOException
+ {
+ // Our temp file replaces targetFile
+ FileUtil.localMove(tempFile, getTargetFile(), getRepositoryContentModification().getItem().getTimestamp());
+ return true;
+ }
+
+ protected synchronized void safeCleanup(boolean cleanRollback)
+ {
+ super.safeCleanup(cleanRollback);
+ safeCloseStream();
+ if (tempFile != null)
+ {
+ tempFile.delete();
+ }
+ }
+
+ // ---------------------------------------------------------------- Private
+
+ private synchronized OutputStream getOutputStream() throws IOException
+ {
+ State s = getState();
+ if (s != State.OPEN && s != State.CANCELLED)
+ {
+ throw new IllegalStateException("Cannot write when state is " + s);
+ }
+
+ if (stream == null)
+ {
+ FileOutputStream fos = new FileOutputStream(getTempFile());
+ stream = new BufferedOutputStream(fos);
+ }
+ return stream;
+ }
+
+ private File getTempFile() throws IOException
+ {
+ if (tempFile == null)
+ {
+ tempFile = createTempFile();
+ }
+ return tempFile;
+ }
+
+ private synchronized void safeCloseStream()
+ {
+ if (stream != null)
+ {
+ synchronized (stream)
+ {
+ try
+ {
+ stream.close();
+ }
+ catch (IOException e)
+ {
+ ContentModification mod = getRepositoryContentModification();
+ getLogger().debug("Caught exception closing stream for " + mod.getRootName() +
+ " " + mod.getItem().getRelativePath(), e);
+ }
+ finally
+ {
+ stream = null;
+ }
+ }
+ }
+ }
+
+}
Property changes on: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java
___________________________________________________________________
Name: svn:keywords
+
Added: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java (rev 0)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java 2009-04-08 16:40:41 UTC (rev 86969)
@@ -0,0 +1,226 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.system.server.profileservice.repository.clustered.local.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.jboss.logging.Logger;
+import org.jboss.system.server.profileservice.repository.clustered.sync.ByteChunk;
+import org.jboss.system.server.profileservice.repository.clustered.sync.ContentModification;
+import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationReadAction;
+
+/**
+ * {@link SynchronizationReadAction} that reads from a {@link InputStream}
+ * and besides returning {@link ByteChunk}s, also writes the stream contents
+ * to a local file. Used for pushing content from a stream to both the cluster
+ * and the local filesystem.
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision: $
+ */
+public class AddContentStreamAction extends AbstractFileWriteAction
+ implements SynchronizationReadAction<FileBasedSynchronizationActionContext>
+{
+ private static final Logger log = Logger.getLogger(AddContentStreamAction.class);
+
+ /**
+ * Max file transfer buffer size that we read at a time.
+ * This influences the number of times that we will invoke disk read/write file
+ * operations versus how much memory we will consume for a file transfer.
+ */
+ public static final int MAX_CHUNK_BUFFER_SIZE = 512 * 1024;
+
+ private final InputStream stream;
+
+ /**
+ * Create a new StreamReadAction.
+ *
+ * @param stream the stream to read
+ * @param context the overall context of the modification
+ * @param modification the modification
+ */
+ public AddContentStreamAction(InputStream stream, File targetFile,
+ FileBasedSynchronizationActionContext context,
+ ContentModification modification)
+ {
+ super(targetFile, context, modification);
+ if (stream == null)
+ {
+ throw new IllegalArgumentException("Null stream");
+ }
+ this.stream = stream;
+ }
+
+ // ------------------------------------ RepositorySynchronizationReadAction
+
+ public ByteChunk getNextBytes() throws IOException
+ {
+ InputStream is = getInputStream();
+ byte[] b = null;
+ int read = -1;
+ synchronized (is)
+ {
+ b = new byte[MAX_CHUNK_BUFFER_SIZE];
+ read = is.read(b);
+ }
+ ByteChunk byteChunk = new ByteChunk(b, read);
+
+ // Write the bytes to our temp file as well
+ if (byteChunk.getByteCount() > -1)
+ {
+ writeBytes(byteChunk);
+ }
+
+ return byteChunk;
+ }
+
+ // -------------------------------------------------------------- Protected
+
+ @Override
+ protected Logger getLogger()
+ {
+ return log;
+ }
+
+ @Override
+ protected void doCancel()
+ {
+ try
+ {
+ super.doCancel();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ @Override
+ protected void doCommit()
+ {
+ try
+ {
+ super.doCommit();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ @Override
+ protected void doComplete() throws Exception
+ {
+ try
+ {
+ super.doComplete();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ @Override
+ protected boolean doPrepare()
+ {
+ try
+ {
+ return super.doPrepare();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ @Override
+ protected void doRollbackFromCancelled()
+ {
+ try
+ {
+ super.doRollbackFromCancelled();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ @Override
+ protected void doRollbackFromOpen()
+ {
+ try
+ {
+ super.doRollbackFromOpen();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ @Override
+ protected void doRollbackFromRollbackOnly()
+ {
+ try
+ {
+ super.doRollbackFromRollbackOnly();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ private synchronized InputStream getInputStream() throws IOException
+ {
+ State s = getState();
+ if (s != State.OPEN && s != State.CANCELLED)
+ {
+ throw new IllegalStateException("Cannot read when state is " + s);
+ }
+ return stream;
+ }
+
+ private synchronized void safeCloseStream()
+ {
+ synchronized (stream)
+ {
+ try
+ {
+ stream.close();
+ }
+ catch (IOException e)
+ {
+ ContentModification mod = getRepositoryContentModification();
+ log.debug("Caught exception closing stream for " + mod.getRootName() +
+ " " + mod.getItem().getRelativePath(), e);
+ }
+ }
+ }
+
+}
Property changes on: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java
___________________________________________________________________
Name: svn:keywords
+
Modified: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FileWriteAction.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FileWriteAction.java 2009-04-08 15:51:12 UTC (rev 86968)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FileWriteAction.java 2009-04-08 16:40:41 UTC (rev 86969)
@@ -22,32 +22,25 @@
package org.jboss.system.server.profileservice.repository.clustered.local.file;
-import java.io.BufferedOutputStream;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import org.jboss.logging.Logger;
import org.jboss.system.server.profileservice.repository.clustered.sync.ByteChunk;
import org.jboss.system.server.profileservice.repository.clustered.sync.ContentModification;
-import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationReadAction;
import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationWriteAction;
/**
- * {@link SynchronizationReadAction} that writes to a {@link File}.
+ * {@link SynchronizationWriteAction} that writes to a {@link File}.
*
* @author Brian Stansberry
*
* @version $Revision: $
*/
-public class FileWriteAction extends AbstractLocalContentChangeAction
+public class FileWriteAction extends AbstractFileWriteAction
implements SynchronizationWriteAction<FileBasedSynchronizationActionContext>
{
private static final Logger log = Logger.getLogger(FileWriteAction.class);
-
- private File tempFile;
- private OutputStream stream;
/**
* Create a new FileWriteAction.
@@ -66,102 +59,15 @@
public void writeBytes(ByteChunk bytes) throws IOException
{
- if (bytes == null)
- {
- throw new IllegalArgumentException("Null bytes");
- }
- if (bytes.getByteCount() < 0)
- {
- throw new IllegalArgumentException("Illegal byte count " + bytes.getByteCount());
- }
- OutputStream os = getOutputStream();
- os.write(bytes.getBytes(), 0, bytes.getByteCount());
+ super.writeBytes(bytes);
}
// -------------------------------------------------------------- Protected
-
-
- @Override
- protected void doComplete() throws Exception
- {
- // Done writing
- safeCloseStream();
- super.doComplete();
- }
@Override
- protected boolean modifyTarget() throws IOException
- {
- // Our temp file replaces targetFile
- FileUtil.localMove(tempFile, getTargetFile(), getRepositoryContentModification().getItem().getTimestamp());
- return true;
- }
-
- @Override
protected Logger getLogger()
{
return log;
}
- protected synchronized void safeCleanup(boolean cleanRollback)
- {
- super.safeCleanup(cleanRollback);
- safeCloseStream();
- if (tempFile != null)
- {
- tempFile.delete();
- }
- }
-
- // ---------------------------------------------------------------- Private
-
- private synchronized OutputStream getOutputStream() throws IOException
- {
- State s = getState();
- if (s != State.OPEN && s != State.CANCELLED)
- {
- throw new IllegalStateException("Cannot write when state is " + s);
- }
-
- if (stream == null)
- {
- FileOutputStream fos = new FileOutputStream(getTempFile());
- stream = new BufferedOutputStream(fos);
- }
- return stream;
- }
-
- private File getTempFile() throws IOException
- {
- if (tempFile == null)
- {
- tempFile = createTempFile();
- }
- return tempFile;
- }
-
- private synchronized void safeCloseStream()
- {
- if (stream != null)
- {
- synchronized (stream)
- {
- try
- {
- stream.close();
- }
- catch (IOException e)
- {
- ContentModification mod = getRepositoryContentModification();
- log.debug("Caught exception closing stream for " + mod.getRootName() +
- " " + mod.getItem().getRelativePath(), e);
- }
- finally
- {
- stream = null;
- }
- }
- }
- }
-
}
Modified: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FilesystemLocalContentManager.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FilesystemLocalContentManager.java 2009-04-08 15:51:12 UTC (rev 86968)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FilesystemLocalContentManager.java 2009-04-08 16:40:41 UTC (rev 86969)
@@ -23,6 +23,7 @@
package org.jboss.system.server.profileservice.repository.clustered.local.file;
import java.io.File;
+import java.io.InputStream;
import java.net.URI;
import java.util.Map;
@@ -113,6 +114,13 @@
}
@Override
+ protected TwoPhaseCommitAction<FileBasedSynchronizationActionContext> createPushStreamToClusterAction(ContentModification mod, InputStream stream)
+ {
+ File targetFile = FileUtil.getFileForItem(getRootURIForModification(mod), mod.getItem());
+ return new AddContentStreamAction(stream, targetFile, getSynchronizationActionContext(), mod);
+ }
+
+ @Override
protected TwoPhaseCommitAction<FileBasedSynchronizationActionContext> createRemoveFromClusterAction(
ContentModification mod, boolean localLed)
{
Deleted: trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/sync/StreamReadAction.java
===================================================================
--- trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/sync/StreamReadAction.java 2009-04-08 15:51:12 UTC (rev 86968)
+++ trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/sync/StreamReadAction.java 2009-04-08 16:40:41 UTC (rev 86969)
@@ -1,169 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.system.server.profileservice.repository.clustered.sync;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.jboss.logging.Logger;
-
-/**
- * {@link SynchronizationReadAction} that reads from a {@link File}.
- *
- * @author Brian Stansberry
- *
- * @version $Revision: $
- */
-public class StreamReadAction<T extends SynchronizationActionContext> extends AbstractSynchronizationAction<T>
- implements SynchronizationReadAction<T>
-{
- private static final Logger log = Logger.getLogger(StreamReadAction.class);
-
- /**
- * Max file transfer buffer size that we read at a time.
- * This influences the number of times that we will invoke disk read/write file
- * operations versus how much memory we will consume for a file transfer.
- */
- public static final int MAX_CHUNK_BUFFER_SIZE = 512 * 1024;
-
- private final InputStream stream;
-
- /**
- * Create a new StreamReadAction.
- *
- * @param stream the stream to read
- * @param context the overall context of the modification
- * @param modification the modification
- */
- public StreamReadAction(InputStream stream, T context,
- ContentModification modification)
- {
- super(context, modification);
- if (stream == null)
- {
- throw new IllegalArgumentException("Null stream");
- }
- this.stream = stream;
- }
-
- // ------------------------------------ RepositorySynchronizationReadAction
-
- public ByteChunk getNextBytes() throws IOException
- {
- InputStream is = getInputStream();
- byte[] b = null;
- int read = -1;
- synchronized (is)
- {
- b = new byte[MAX_CHUNK_BUFFER_SIZE];
- read = is.read(b);
- }
- return new ByteChunk(b, read);
- }
-
- // -------------------------------------------------------------- Protected
-
- @Override
- protected void doCancel()
- {
- safeCloseStream();
- }
-
- @Override
- protected void doCommit()
- {
- safeCloseStream();
- }
-
- @Override
- protected void doComplete() throws Exception
- {
- safeCloseStream();
- }
-
- @Override
- protected boolean doPrepare()
- {
- safeCloseStream();
- return true;
- }
-
- @Override
- protected void doRollbackFromCancelled()
- {
- safeCloseStream();
- }
-
- @Override
- protected void doRollbackFromComplete()
- {
- safeCloseStream();
- }
-
- @Override
- protected void doRollbackFromOpen()
- {
- safeCloseStream();
- }
-
- @Override
- protected void doRollbackFromPrepared()
- {
- safeCloseStream();
- }
-
- @Override
- protected void doRollbackFromRollbackOnly()
- {
- safeCloseStream();
- }
-
- private synchronized InputStream getInputStream() throws IOException
- {
- State s = getState();
- if (s != State.OPEN && s != State.CANCELLED)
- {
- throw new IllegalStateException("Cannot read when state is " + s);
- }
- return stream;
- }
-
- private synchronized void safeCloseStream()
- {
- synchronized (stream)
- {
- try
- {
- stream.close();
- }
- catch (IOException e)
- {
- ContentModification mod = getRepositoryContentModification();
- log.debug("Caught exception closing stream for " + mod.getRootName() +
- " " + mod.getItem().getRelativePath(), e);
- }
- }
- }
-
-}
Added: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/profileservice/test/ClusteredDeploymentRepoAddContentTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/profileservice/test/ClusteredDeploymentRepoAddContentTestCase.java (rev 0)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/profileservice/test/ClusteredDeploymentRepoAddContentTestCase.java 2009-04-08 16:40:41 UTC (rev 86969)
@@ -0,0 +1,277 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.cluster.defaultcfg.profileservice.test;
+
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+import org.jboss.deployers.spi.management.ManagementView;
+import org.jboss.deployers.spi.management.deploy.DeploymentManager;
+import org.jboss.deployers.spi.management.deploy.DeploymentProgress;
+import org.jboss.deployers.spi.management.deploy.DeploymentStatus;
+import org.jboss.deployers.spi.management.deploy.ProgressEvent;
+import org.jboss.deployers.spi.management.deploy.ProgressListener;
+import org.jboss.managed.api.ManagedDeployment;
+import org.jboss.profileservice.spi.ProfileKey;
+import org.jboss.profileservice.spi.ProfileService;
+import org.jboss.test.JBossClusteredTestCase;
+import org.jboss.virtual.VFS;
+
+/**
+ *
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision: $
+ */
+public class ClusteredDeploymentRepoAddContentTestCase
+ extends JBossClusteredTestCase implements ProgressListener
+{
+ /** We use the default profile, defined by DeploymentManager to deploy apps. */
+ public static final ProfileKey farmProfile = new ProfileKey("farm");
+
+ protected ManagementView activeView;
+ protected DeploymentManager deployMgr;
+ private long eventCount = 0;
+
+ /**
+ * Create a new ClusteredDeploymentRepoAddContentTestCase.
+ *
+ * @param name
+ */
+ public ClusteredDeploymentRepoAddContentTestCase(String name)
+ {
+ super(name);
+ }
+
+ public void testDeployment() throws Exception
+ {
+ String name = "farm-addedcontent-service.xml";
+ ManagedDeploymentTester tester = new ManagedDeploymentTester()
+ {
+ public void testManagedDeployment() throws Exception
+ {
+ boolean node0OK = false;
+ boolean node1OK = false;
+
+ MBeanServerConnection[] adaptors = getAdaptors();
+ ObjectName oname = new ObjectName("jboss.system:service=FarmAddContentTestThreadPool");
+
+ long deadline = System.currentTimeMillis() + 12000;
+ do
+ {
+ if (!node0OK)
+ {
+ try
+ {
+ node0OK = "FarmAddContentThreadPool".equals(adaptors[0].getAttribute(oname, "Name"));
+ }
+ catch (Exception ignored) {}
+ }
+ if (!node1OK)
+ {
+ try
+ {
+ node1OK = "FarmAddContentThreadPool".equals(adaptors[1].getAttribute(oname, "Name"));
+ }
+ catch (Exception ignored) {}
+ }
+
+ if (node0OK && node1OK)
+ {
+ break;
+ }
+
+ Thread.sleep(200);
+ }
+ while (System.currentTimeMillis() < deadline);
+
+ assertTrue("node0 OK", node0OK);
+ assertTrue("node1 OK", node1OK);
+ }
+
+ };
+ testDeployment(name, "sar", tester);
+// testDeployment(name, "sar", null);
+ }
+
+ protected void testDeployment(String name, String type, ManagedDeploymentTester tester) throws Exception
+ {
+ DeploymentManager deployMgr = getDeploymentManager(getNamingContext(0));
+ URL contentURL = getDeployURL(name);
+ assertNotNull(contentURL);
+ getLog().debug(contentURL);
+ // TODO - hack to get off JDK's url handling
+ String urlString = contentURL.toExternalForm();
+ int p = urlString.indexOf(":/");
+ contentURL = new URL("vfszip" + urlString.substring(p));
+ getLog().debug(contentURL);
+
+ DeploymentStatus status;
+ DeploymentProgress progress = deployMgr.distribute(name, contentURL, true);
+ progress.addProgressListener(this);
+ progress.run();
+ String[] uploadedNames = {};
+ try
+ {
+ status = progress.getDeploymentStatus();
+ assertTrue("DeploymentStatus.isCompleted: " + status, status.isCompleted());
+ // It should not be running yet
+ assertFalse("DeploymentStatus.isRunning: " + status, status.isRunning());
+ assertFalse("DeploymentStatus.isFailed: " + status, status.isFailed());
+
+ // Get the unique deployment name
+ uploadedNames = progress.getDeploymentID().getRepositoryNames();
+ getLog().debug("Uploaded deployment names: "+Arrays.asList(uploadedNames));
+
+ // Now start the deployment
+ progress = deployMgr.start(uploadedNames);
+ progress.addProgressListener(this);
+ progress.run();
+ try
+ {
+ status = progress.getDeploymentStatus();
+ assertTrue("DeploymentStatus.isCompleted: " + status, status.isCompleted());
+ assertFalse("DeploymentStatus.isRunning: " + status, status.isRunning());
+ assertFalse("DeploymentStatus.isFailed: " + status, status.isFailed());
+ // Check for a
+ ManagementView mgtView = getManagementView(getNamingContext(0));
+ ManagedDeployment deployment = mgtView.getDeployment(uploadedNames[0]);
+ assertNotNull(deployment);
+ getLog().info("Found " + type + " deployment: " + deployment);
+ Set<String> types = deployment.getTypes();
+ if (types != null && types.isEmpty() == false)
+ assertTrue("Missing type: " + type + ", available: " + types, types.contains(type));
+ if (tester != null)
+ {
+ tester.testManagedDeployment();
+ }
+ }
+ finally
+ {
+ //Thread.sleep(15 * 1000); // 15 secs >> more than it takes for reaper to run :-)
+
+ // Stop/remove the deployment
+ progress = deployMgr.stop(uploadedNames);
+ progress.addProgressListener(this);
+ progress.run();
+ status = progress.getDeploymentStatus();
+ assertTrue("DeploymentStatus.isCompleted: " + status, status.isCompleted());
+ assertFalse("DeploymentStatus.isFailed: " + status, status.isFailed());
+ }
+ }
+ finally
+ {
+ progress = deployMgr.remove(uploadedNames);
+ progress.addProgressListener(this);
+ progress.run();
+ status = progress.getDeploymentStatus();
+ assertTrue("DeploymentStatus.isCompleted: " + status, status.isCompleted());
+ assertFalse("DeploymentStatus.isFailed: " + status, status.isFailed());
+ }
+ }
+ /**
+ * Obtain the ProfileService.ManagementView
+ * @return
+ * @throws Exception
+ */
+ protected DeploymentManager getDeploymentManager(Context ctx)
+ throws Exception
+ {
+ if( deployMgr == null )
+ {
+ ProfileService ps = (ProfileService) ctx.lookup("ProfileService");
+ deployMgr = ps.getDeploymentManager();
+ deployMgr.loadProfile(getProfileKey());
+ // Init the VFS to setup the vfs* protocol handlers
+ VFS.init();
+ }
+ return deployMgr;
+ }
+
+ /**
+ * Obtain the ProfileService.ManagementView
+ * @return
+ * @throws Exception
+ */
+ protected ManagementView getManagementView(Context ctx)
+ throws Exception
+ {
+ if( activeView == null )
+ {
+ ProfileService ps = (ProfileService) ctx.lookup("ProfileService");
+ activeView = ps.getViewManager();
+ // Init the VFS to setup the vfs* protocol handlers
+ VFS.init();
+ }
+ // Reload
+ activeView.load();
+ return activeView;
+ }
+
+ protected Context getNamingContext(int nodeIndex) throws Exception
+ {
+ // Connect to the server0 JNDI
+ String[] urls = getNamingURLs();
+ Properties env1 = new Properties();
+ env1.setProperty(Context.INITIAL_CONTEXT_FACTORY,
+ "org.jnp.interfaces.NamingContextFactory");
+ env1.setProperty(Context.PROVIDER_URL, urls[nodeIndex]);
+ return new InitialContext(env1);
+ }
+
+ protected ProfileKey getProfileKey()
+ {
+ if(getProfileName() == null)
+ return farmProfile;
+
+ return new ProfileKey(getProfileName());
+ }
+
+ /**
+ * @return the ProfileKey.name to use when loading the profile
+ */
+ protected String getProfileName()
+ {
+ return null;
+ }
+
+ private interface ManagedDeploymentTester
+ {
+ void testManagedDeployment() throws Exception;
+ }
+
+ public void progressEvent(ProgressEvent eventInfo)
+ {
+ eventCount ++;
+ getLog().debug(eventInfo);
+ }
+
+}
Property changes on: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/profileservice/test/ClusteredDeploymentRepoAddContentTestCase.java
___________________________________________________________________
Name: svn:keywords
+
More information about the jboss-cvs-commits
mailing list