[jboss-cvs] JBossAS SVN: r86973 - in branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered: local and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Apr 8 13:10:06 EDT 2009
Author: bstansberry at jboss.com
Date: 2009-04-08 13:10:05 -0400 (Wed, 08 Apr 2009)
New Revision: 86973
Added:
branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java
branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java
Removed:
branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/sync/StreamReadAction.java
Modified:
branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/ClusteredDeploymentRepository.java
branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/AbstractLocalContentManager.java
branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FileWriteAction.java
branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FilesystemLocalContentManager.java
Log:
[JBAS-5552] Sort issues with DeploymentRepository.addDeploymentContent(...)
Modified: branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/ClusteredDeploymentRepository.java
===================================================================
--- branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/ClusteredDeploymentRepository.java 2009-04-08 16:56:21 UTC (rev 86972)
+++ branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/ClusteredDeploymentRepository.java 2009-04-08 17:10:05 UTC (rev 86973)
@@ -374,57 +374,54 @@
// have a modification
RepositoryContentMetadata baseContent = null;
RepositoryContentMetadata latestContent = null;
- boolean unmodified = false;
if (!this.clusteringHandler.lockLocally())
{
// Don't throw an exception; just log error and indicate no mods
log.error("getModifiedDeployments(): Cannot acquire local lock");
return Collections.emptySet();
}
+
try
{
baseContent = this.localContentManager.getOfficialContentMetadata();
latestContent = this.localContentManager.getCurrentContentMetadata();
- unmodified = latestContent.equals(baseContent);
+ boolean unmodified = latestContent.equals(baseContent);
+
+ if (unmodified)
+ {
+ // Our metadata is in sync, so our content is in sync with the
+ // cluster. However, we might have had changes made to our
+ // content (e.g. via addDeploymentContenxt(...)) that have not
+ // been deployed. So, check for those...
+ return createModificationInfo();
+ }
}
finally
{
this.clusteringHandler.unlockLocally();
}
- Collection<ModificationInfo> result = null;
- if (unmodified)
+
+ // If we got here, our content is out of sync with the cluster, so take
+ // control of the cluster and bring the cluster in line with our current state
+ if (!this.clusteringHandler.lockGlobally())
{
- // Done
- result = Collections.emptySet();
+ // Don't throw an exception; just log error and indicate no mods
+ log.error("getModifiedDeployments(): Cannot acquire global lock");
+ return Collections.emptySet();
}
- else
- {
- // Something was modified, so take control of the cluster and
- // bring the cluster in line with our current state
+ try
+ {
+ // Tell the clustering handler to synchronize, but without
+ // pulling anything to cluster -- just push our changes
+ latestContent = this.clusteringHandler.synchronizeContent(false);
- if (!this.clusteringHandler.lockGlobally())
- {
- // Don't throw an exception; just log error and indicate no mods
- log.error("getModifiedDeployments(): Cannot acquire global lock");
- return Collections.emptySet();
- }
- try
- {
- // Tell the clustering handler to synchronize, but without
- // pulling anything to cluster -- just push our changes
- latestContent = this.clusteringHandler.synchronizeContent(false);
-
- return createModificationInfo();
- }
- finally
- {
- this.clusteringHandler.unlockGlobally();
- }
+ return createModificationInfo();
}
-
- return result;
-
+ finally
+ {
+ this.clusteringHandler.unlockGlobally();
+ }
}
public void unload()
Modified: branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/AbstractLocalContentManager.java
===================================================================
--- branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/AbstractLocalContentManager.java 2009-04-08 16:56:21 UTC (rev 86972)
+++ branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/AbstractLocalContentManager.java 2009-04-08 17:10:05 UTC (rev 86973)
@@ -39,7 +39,6 @@
import org.jboss.system.server.profileservice.repository.clustered.sync.ContentModification;
import org.jboss.system.server.profileservice.repository.clustered.sync.NoOpSynchronizationAction;
import org.jboss.system.server.profileservice.repository.clustered.sync.RemovalMetadataInsertionAction;
-import org.jboss.system.server.profileservice.repository.clustered.sync.StreamReadAction;
import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationAction;
import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationActionContext;
import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationId;
@@ -379,7 +378,7 @@
else if (rootName == null)
{
// Use the first root that can accept children
- URI rootURI = namedURIMap.get(rootName);
+ URI rootURI = namedURIMap.get(rmd.getName());
VirtualFile vf = getCachedVirtualFile(rootURI);
if (isDirectory(vf))
{
@@ -407,15 +406,19 @@
throw new IllegalArgumentException("Unknown root name " + toAdd.getRootName());
}
RepositoryItemMetadata remove = rmd.getItemMetadata(toAdd.getRelativePathElements());
- if (remove.isDirectory())
+ if (remove != null)
{
- for (RepositoryItemMetadata rim : rmd.getContent())
+ if (remove.isDirectory())
{
- if (rim.isChildOf(remove))
+ for (RepositoryItemMetadata rim : rmd.getContent())
{
- rmd.removeItemMetadata(rim.getRelativePathElements());
+ if (rim.isChildOf(remove))
+ {
+ rmd.removeItemMetadata(rim.getRelativePathElements());
+ }
}
}
+ rmd.removeItemMetadata(remove.getRelativePathElements());
}
rmd.addItemMetadata(toAdd);
pendingStreams.put(toAdd, contentIS);
@@ -426,13 +429,15 @@
{
URI uri = namedURIMap.get(item.getRootName());
VirtualFile vf = getCachedVirtualFile(uri);
+ VirtualFile parent = null;
List<String> path = item.getRelativePathElements();
for (String element : path)
{
- vf = vf.getChild(element);
+ parent = vf;
+ vf = parent.getChild(element);
if (vf == null)
{
- throw new IllegalStateException("No child " + element + " under " + vf);
+ throw new IllegalStateException("No child " + element + " under " + parent);
}
}
return vf;
@@ -465,18 +470,21 @@
}
RepositoryItemMetadata remove = root.getItemMetadata(path);
- if (isDirectory(vf))
+ if (remove != null)
{
- for (RepositoryItemMetadata rim : root.getContent())
+ if (isDirectory(vf))
{
- if (rim.isChildOf(remove))
+ for (RepositoryItemMetadata rim : root.getContent())
{
- root.removeItemMetadata(rim.getRelativePathElements());
+ if (rim.isChildOf(remove))
+ {
+ root.removeItemMetadata(rim.getRelativePathElements());
+ }
}
+
}
-
+ root.removeItemMetadata(path);
}
- root.removeItemMetadata(path);
return cmd;
}
@@ -489,6 +497,8 @@
protected abstract TwoPhaseCommitAction<T> createPullFromClusterAction(ContentModification mod, boolean localLed);
protected abstract TwoPhaseCommitAction<T> createPushToClusterAction(ContentModification mod, boolean localLed);
+
+ protected abstract TwoPhaseCommitAction<T> createPushStreamToClusterAction(ContentModification mod, InputStream stream);
protected abstract TwoPhaseCommitAction<T> createRemoveFromClusterAction(ContentModification mod, boolean localLed);
@@ -599,11 +609,6 @@
}
}
- private TwoPhaseCommitAction<T> createPushStreamToClusterAction(ContentModification mod, InputStream stream)
- {
- return new StreamReadAction<T>(stream, getSynchronizationActionContext(), mod);
- }
-
private void validateSynchronization(SynchronizationId<?> id)
{
if (id == null)
Copied: branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java (from rev 86969, trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java)
===================================================================
--- branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java (rev 0)
+++ branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AbstractFileWriteAction.java 2009-04-08 17:10:05 UTC (rev 86973)
@@ -0,0 +1,153 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.system.server.profileservice.repository.clustered.local.file;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.jboss.system.server.profileservice.repository.clustered.sync.ByteChunk;
+import org.jboss.system.server.profileservice.repository.clustered.sync.ContentModification;
+
+/**
+ * Base class for actions that write to a {@link File}.
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision: $
+ */
+public abstract class AbstractFileWriteAction extends AbstractLocalContentChangeAction
+{
+
+ private File tempFile;
+ private OutputStream stream;
+
+ /**
+ * Create a new FileWriteAction.
+ *
+ * @param targetFile the file to write to
+ * @param context the overall context of the modification
+ * @param modification the modification
+ */
+ public AbstractFileWriteAction(File targetFile, FileBasedSynchronizationActionContext context,
+ ContentModification modification)
+ {
+ super(targetFile, context, modification);
+ }
+
+ // -------------------------------------------------------------- Protected
+
+ protected void writeBytes(ByteChunk bytes) throws IOException
+ {
+ if (bytes == null)
+ {
+ throw new IllegalArgumentException("Null bytes");
+ }
+ if (bytes.getByteCount() < 0)
+ {
+ throw new IllegalArgumentException("Illegal byte count " + bytes.getByteCount());
+ }
+ OutputStream os = getOutputStream();
+ os.write(bytes.getBytes(), 0, bytes.getByteCount());
+ }
+
+ @Override
+ protected void doComplete() throws Exception
+ {
+ // Done writing
+ safeCloseStream();
+ super.doComplete();
+ }
+
+ @Override
+ protected boolean modifyTarget() throws IOException
+ {
+ // Our temp file replaces targetFile
+ FileUtil.localMove(tempFile, getTargetFile(), getRepositoryContentModification().getItem().getTimestamp());
+ return true;
+ }
+
+ protected synchronized void safeCleanup(boolean cleanRollback)
+ {
+ super.safeCleanup(cleanRollback);
+ safeCloseStream();
+ if (tempFile != null)
+ {
+ tempFile.delete();
+ }
+ }
+
+ // ---------------------------------------------------------------- Private
+
+ private synchronized OutputStream getOutputStream() throws IOException
+ {
+ State s = getState();
+ if (s != State.OPEN && s != State.CANCELLED)
+ {
+ throw new IllegalStateException("Cannot write when state is " + s);
+ }
+
+ if (stream == null)
+ {
+ FileOutputStream fos = new FileOutputStream(getTempFile());
+ stream = new BufferedOutputStream(fos);
+ }
+ return stream;
+ }
+
+ private File getTempFile() throws IOException
+ {
+ if (tempFile == null)
+ {
+ tempFile = createTempFile();
+ }
+ return tempFile;
+ }
+
+ private synchronized void safeCloseStream()
+ {
+ if (stream != null)
+ {
+ synchronized (stream)
+ {
+ try
+ {
+ stream.close();
+ }
+ catch (IOException e)
+ {
+ ContentModification mod = getRepositoryContentModification();
+ getLogger().debug("Caught exception closing stream for " + mod.getRootName() +
+ " " + mod.getItem().getRelativePath(), e);
+ }
+ finally
+ {
+ stream = null;
+ }
+ }
+ }
+ }
+
+}
Copied: branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java (from rev 86969, trunk/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java)
===================================================================
--- branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java (rev 0)
+++ branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/AddContentStreamAction.java 2009-04-08 17:10:05 UTC (rev 86973)
@@ -0,0 +1,226 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.system.server.profileservice.repository.clustered.local.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.jboss.logging.Logger;
+import org.jboss.system.server.profileservice.repository.clustered.sync.ByteChunk;
+import org.jboss.system.server.profileservice.repository.clustered.sync.ContentModification;
+import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationReadAction;
+
+/**
+ * {@link SynchronizationReadAction} that reads from a {@link InputStream}
+ * and besides returning {@link ByteChunk}s, also writes the stream contents
+ * to a local file. Used for pushing content from a stream to both the cluster
+ * and the local filesystem.
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision: $
+ */
+public class AddContentStreamAction extends AbstractFileWriteAction
+ implements SynchronizationReadAction<FileBasedSynchronizationActionContext>
+{
+ private static final Logger log = Logger.getLogger(AddContentStreamAction.class);
+
+ /**
+ * Max file transfer buffer size that we read at a time.
+ * This influences the number of times that we will invoke disk read/write file
+ * operations versus how much memory we will consume for a file transfer.
+ */
+ public static final int MAX_CHUNK_BUFFER_SIZE = 512 * 1024;
+
+ private final InputStream stream;
+
+ /**
+ * Create a new StreamReadAction.
+ *
+ * @param stream the stream to read
+ * @param context the overall context of the modification
+ * @param modification the modification
+ */
+ public AddContentStreamAction(InputStream stream, File targetFile,
+ FileBasedSynchronizationActionContext context,
+ ContentModification modification)
+ {
+ super(targetFile, context, modification);
+ if (stream == null)
+ {
+ throw new IllegalArgumentException("Null stream");
+ }
+ this.stream = stream;
+ }
+
+ // ------------------------------------ RepositorySynchronizationReadAction
+
+ public ByteChunk getNextBytes() throws IOException
+ {
+ InputStream is = getInputStream();
+ byte[] b = null;
+ int read = -1;
+ synchronized (is)
+ {
+ b = new byte[MAX_CHUNK_BUFFER_SIZE];
+ read = is.read(b);
+ }
+ ByteChunk byteChunk = new ByteChunk(b, read);
+
+ // Write the bytes to our temp file as well
+ if (byteChunk.getByteCount() > -1)
+ {
+ writeBytes(byteChunk);
+ }
+
+ return byteChunk;
+ }
+
+ // -------------------------------------------------------------- Protected
+
+ @Override
+ protected Logger getLogger()
+ {
+ return log;
+ }
+
+ @Override
+ protected void doCancel()
+ {
+ try
+ {
+ super.doCancel();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ @Override
+ protected void doCommit()
+ {
+ try
+ {
+ super.doCommit();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ @Override
+ protected void doComplete() throws Exception
+ {
+ try
+ {
+ super.doComplete();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ @Override
+ protected boolean doPrepare()
+ {
+ try
+ {
+ return super.doPrepare();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ @Override
+ protected void doRollbackFromCancelled()
+ {
+ try
+ {
+ super.doRollbackFromCancelled();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ @Override
+ protected void doRollbackFromOpen()
+ {
+ try
+ {
+ super.doRollbackFromOpen();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ @Override
+ protected void doRollbackFromRollbackOnly()
+ {
+ try
+ {
+ super.doRollbackFromRollbackOnly();
+ }
+ finally
+ {
+ safeCloseStream();
+ }
+ }
+
+ private synchronized InputStream getInputStream() throws IOException
+ {
+ State s = getState();
+ if (s != State.OPEN && s != State.CANCELLED)
+ {
+ throw new IllegalStateException("Cannot read when state is " + s);
+ }
+ return stream;
+ }
+
+ private synchronized void safeCloseStream()
+ {
+ synchronized (stream)
+ {
+ try
+ {
+ stream.close();
+ }
+ catch (IOException e)
+ {
+ ContentModification mod = getRepositoryContentModification();
+ log.debug("Caught exception closing stream for " + mod.getRootName() +
+ " " + mod.getItem().getRelativePath(), e);
+ }
+ }
+ }
+
+}
Modified: branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FileWriteAction.java
===================================================================
--- branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FileWriteAction.java 2009-04-08 16:56:21 UTC (rev 86972)
+++ branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FileWriteAction.java 2009-04-08 17:10:05 UTC (rev 86973)
@@ -22,32 +22,25 @@
package org.jboss.system.server.profileservice.repository.clustered.local.file;
-import java.io.BufferedOutputStream;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import org.jboss.logging.Logger;
import org.jboss.system.server.profileservice.repository.clustered.sync.ByteChunk;
import org.jboss.system.server.profileservice.repository.clustered.sync.ContentModification;
-import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationReadAction;
import org.jboss.system.server.profileservice.repository.clustered.sync.SynchronizationWriteAction;
/**
- * {@link SynchronizationReadAction} that writes to a {@link File}.
+ * {@link SynchronizationWriteAction} that writes to a {@link File}.
*
* @author Brian Stansberry
*
* @version $Revision: $
*/
-public class FileWriteAction extends AbstractLocalContentChangeAction
+public class FileWriteAction extends AbstractFileWriteAction
implements SynchronizationWriteAction<FileBasedSynchronizationActionContext>
{
private static final Logger log = Logger.getLogger(FileWriteAction.class);
-
- private File tempFile;
- private OutputStream stream;
/**
* Create a new FileWriteAction.
@@ -66,102 +59,15 @@
public void writeBytes(ByteChunk bytes) throws IOException
{
- if (bytes == null)
- {
- throw new IllegalArgumentException("Null bytes");
- }
- if (bytes.getByteCount() < 0)
- {
- throw new IllegalArgumentException("Illegal byte count " + bytes.getByteCount());
- }
- OutputStream os = getOutputStream();
- os.write(bytes.getBytes(), 0, bytes.getByteCount());
+ super.writeBytes(bytes);
}
// -------------------------------------------------------------- Protected
-
-
- @Override
- protected void doComplete() throws Exception
- {
- // Done writing
- safeCloseStream();
- super.doComplete();
- }
@Override
- protected boolean modifyTarget() throws IOException
- {
- // Our temp file replaces targetFile
- FileUtil.localMove(tempFile, getTargetFile(), getRepositoryContentModification().getItem().getTimestamp());
- return true;
- }
-
- @Override
protected Logger getLogger()
{
return log;
}
- protected synchronized void safeCleanup(boolean cleanRollback)
- {
- super.safeCleanup(cleanRollback);
- safeCloseStream();
- if (tempFile != null)
- {
- tempFile.delete();
- }
- }
-
- // ---------------------------------------------------------------- Private
-
- private synchronized OutputStream getOutputStream() throws IOException
- {
- State s = getState();
- if (s != State.OPEN && s != State.CANCELLED)
- {
- throw new IllegalStateException("Cannot write when state is " + s);
- }
-
- if (stream == null)
- {
- FileOutputStream fos = new FileOutputStream(getTempFile());
- stream = new BufferedOutputStream(fos);
- }
- return stream;
- }
-
- private File getTempFile() throws IOException
- {
- if (tempFile == null)
- {
- tempFile = createTempFile();
- }
- return tempFile;
- }
-
- private synchronized void safeCloseStream()
- {
- if (stream != null)
- {
- synchronized (stream)
- {
- try
- {
- stream.close();
- }
- catch (IOException e)
- {
- ContentModification mod = getRepositoryContentModification();
- log.debug("Caught exception closing stream for " + mod.getRootName() +
- " " + mod.getItem().getRelativePath(), e);
- }
- finally
- {
- stream = null;
- }
- }
- }
- }
-
}
Modified: branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FilesystemLocalContentManager.java
===================================================================
--- branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FilesystemLocalContentManager.java 2009-04-08 16:56:21 UTC (rev 86972)
+++ branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/local/file/FilesystemLocalContentManager.java 2009-04-08 17:10:05 UTC (rev 86973)
@@ -23,6 +23,7 @@
package org.jboss.system.server.profileservice.repository.clustered.local.file;
import java.io.File;
+import java.io.InputStream;
import java.net.URI;
import java.util.Map;
@@ -113,6 +114,13 @@
}
@Override
+ protected TwoPhaseCommitAction<FileBasedSynchronizationActionContext> createPushStreamToClusterAction(ContentModification mod, InputStream stream)
+ {
+ File targetFile = FileUtil.getFileForItem(getRootURIForModification(mod), mod.getItem());
+ return new AddContentStreamAction(stream, targetFile, getSynchronizationActionContext(), mod);
+ }
+
+ @Override
protected TwoPhaseCommitAction<FileBasedSynchronizationActionContext> createRemoveFromClusterAction(
ContentModification mod, boolean localLed)
{
Deleted: branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/sync/StreamReadAction.java
===================================================================
--- branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/sync/StreamReadAction.java 2009-04-08 16:56:21 UTC (rev 86972)
+++ branches/Branch_5_x/system/src/main/org/jboss/system/server/profileservice/repository/clustered/sync/StreamReadAction.java 2009-04-08 17:10:05 UTC (rev 86973)
@@ -1,169 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.system.server.profileservice.repository.clustered.sync;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.jboss.logging.Logger;
-
-/**
- * {@link SynchronizationReadAction} that reads from a {@link File}.
- *
- * @author Brian Stansberry
- *
- * @version $Revision: $
- */
-public class StreamReadAction<T extends SynchronizationActionContext> extends AbstractSynchronizationAction<T>
- implements SynchronizationReadAction<T>
-{
- private static final Logger log = Logger.getLogger(StreamReadAction.class);
-
- /**
- * Max file transfer buffer size that we read at a time.
- * This influences the number of times that we will invoke disk read/write file
- * operations versus how much memory we will consume for a file transfer.
- */
- public static final int MAX_CHUNK_BUFFER_SIZE = 512 * 1024;
-
- private final InputStream stream;
-
- /**
- * Create a new StreamReadAction.
- *
- * @param stream the stream to read
- * @param context the overall context of the modification
- * @param modification the modification
- */
- public StreamReadAction(InputStream stream, T context,
- ContentModification modification)
- {
- super(context, modification);
- if (stream == null)
- {
- throw new IllegalArgumentException("Null stream");
- }
- this.stream = stream;
- }
-
- // ------------------------------------ RepositorySynchronizationReadAction
-
- public ByteChunk getNextBytes() throws IOException
- {
- InputStream is = getInputStream();
- byte[] b = null;
- int read = -1;
- synchronized (is)
- {
- b = new byte[MAX_CHUNK_BUFFER_SIZE];
- read = is.read(b);
- }
- return new ByteChunk(b, read);
- }
-
- // -------------------------------------------------------------- Protected
-
- @Override
- protected void doCancel()
- {
- safeCloseStream();
- }
-
- @Override
- protected void doCommit()
- {
- safeCloseStream();
- }
-
- @Override
- protected void doComplete() throws Exception
- {
- safeCloseStream();
- }
-
- @Override
- protected boolean doPrepare()
- {
- safeCloseStream();
- return true;
- }
-
- @Override
- protected void doRollbackFromCancelled()
- {
- safeCloseStream();
- }
-
- @Override
- protected void doRollbackFromComplete()
- {
- safeCloseStream();
- }
-
- @Override
- protected void doRollbackFromOpen()
- {
- safeCloseStream();
- }
-
- @Override
- protected void doRollbackFromPrepared()
- {
- safeCloseStream();
- }
-
- @Override
- protected void doRollbackFromRollbackOnly()
- {
- safeCloseStream();
- }
-
- private synchronized InputStream getInputStream() throws IOException
- {
- State s = getState();
- if (s != State.OPEN && s != State.CANCELLED)
- {
- throw new IllegalStateException("Cannot read when state is " + s);
- }
- return stream;
- }
-
- private synchronized void safeCloseStream()
- {
- synchronized (stream)
- {
- try
- {
- stream.close();
- }
- catch (IOException e)
- {
- ContentModification mod = getRepositoryContentModification();
- log.debug("Caught exception closing stream for " + mod.getRootName() +
- " " + mod.getItem().getRelativePath(), e);
- }
- }
- }
-
-}
More information about the jboss-cvs-commits
mailing list