[exo-jcr-commits] exo-jcr SVN: r5451 - jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Jan 13 10:23:36 EST 2012
Author: nzamosenchuk
Date: 2012-01-13 10:23:33 -0500 (Fri, 13 Jan 2012)
New Revision: 5451
Added:
jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/RsyncIndexInfos.java
Modified:
jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java
jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexInfos.java
Log:
EXOJCR-1709 : initial implementation based on PoC.
What is ready :
- initial code-architecture;
- index sync, when index list changes;
- configuration;
TODO:
- Sync index on start to get initial index;
- Failover capabilities;
- User/password for RSync server;
RSync approach is built upon JBossCache concept, overriding just few things that adds RSync invocation when processing an index list modification event
Modified: jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java
===================================================================
--- jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java 2012-01-13 09:39:01 UTC (rev 5450)
+++ jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java 2012-01-13 15:23:33 UTC (rev 5451)
@@ -29,6 +29,8 @@
import org.exoplatform.services.jcr.impl.core.query.IndexingTree;
import org.exoplatform.services.jcr.impl.core.query.QueryHandler;
import org.exoplatform.services.jcr.impl.core.query.SearchManager;
+import org.exoplatform.services.jcr.impl.core.query.lucene.IndexInfos;
+import org.exoplatform.services.jcr.impl.core.query.lucene.SearchIndex;
import org.exoplatform.services.jcr.jbosscache.ExoJBossCacheFactory;
import org.exoplatform.services.jcr.jbosscache.ExoJBossCacheFactory.CacheType;
import org.exoplatform.services.jcr.jbosscache.PrivilegedJBossCacheHelper;
@@ -75,10 +77,25 @@
public static final Boolean PARAM_JBOSSCACHE_SHAREABLE_DEFAULT = Boolean.FALSE;
+ // RSYNC SERVER CONFIGURATION
+ public static final String PARAM_RSYNC_ENTRY_NAME = "rsync-entry-name";
+
+ public static final String PARAM_RSYNC_ENTRY_PATH = "rsync-entry-path";
+
+ public static final String PARAM_RSYNC_PORT = "rsync-port";
+
+ public static final int PARAM_RSYNC_PORT_DEFAULT = 873;
+
+ // TODO
+ public static final String PARAM_RSYNC_USER = "rsync-user";
+
+ public static final String PARAM_RSYNC_PASSWORD = "rsync-password";
+
+ // Fields
private final Cache<Serializable, Object> cache;
private final Fqn<String> rootFqn;
-
+
private final JmxRegistrationManager jmxManager;
public static final String LISTWRAPPER = "$lists".intern();
@@ -140,8 +157,10 @@
PrivilegedJBossCacheHelper.create(cache);
PrivilegedJBossCacheHelper.start(cache);
-
- this.jmxManager = ExoJBossCacheFactory.getJmxRegistrationManager(searchManager.getExoContainerContext(), cache, CacheType.INDEX_CACHE);
+
+ this.jmxManager =
+ ExoJBossCacheFactory.getJmxRegistrationManager(searchManager.getExoContainerContext(), cache,
+ CacheType.INDEX_CACHE);
if (jmxManager != null)
{
SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
@@ -171,18 +190,50 @@
if (!parentHandler.isInitialized())
{
- parentHandler.setIndexInfos(new JBossCacheIndexInfos(rootFqn, cache, true, modeHandler));
+ parentHandler.setIndexInfos(createIndexInfos(true, modeHandler, config, parentHandler));
parentHandler.setIndexUpdateMonitor(new JBossCacheIndexUpdateMonitor(rootFqn, cache, true, modeHandler));
parentHandler.init();
}
if (!handler.isInitialized())
{
- handler.setIndexInfos(new JBossCacheIndexInfos(rootFqn, cache, false, modeHandler));
+ handler.setIndexInfos(createIndexInfos(false, modeHandler, config, handler));
handler.setIndexUpdateMonitor(new JBossCacheIndexUpdateMonitor(rootFqn, cache, false, modeHandler));
handler.init();
}
}
+ /**
+ * Factory method for creating corresponding IndexInfos class. RSyncIndexInfos created if RSync configured
+ * and JBossCacheIndexInfos otherwise
+ *
+ * @param system
+ * @param modeHandler
+ * @param config
+ * @param handler
+ * @return
+ * @throws RepositoryConfigurationException
+ */
+ private IndexInfos createIndexInfos(Boolean system, IndexerIoModeHandler modeHandler, QueryHandlerEntry config,
+ QueryHandler handler) throws RepositoryConfigurationException
+ {
+ // read RSYNC configuration
+ String rsyncEntryName = config.getParameterValue(PARAM_RSYNC_ENTRY_NAME, null);
+ String rsyncEntryPath = config.getParameterValue(PARAM_RSYNC_ENTRY_PATH, null);
+ int rsyncPort = config.getParameterInteger(PARAM_RSYNC_PORT, PARAM_RSYNC_PORT_DEFAULT);
+
+ // rsync configured
+ if (rsyncEntryName != null)
+ {
+ return new RsyncIndexInfos(rootFqn, cache, system, modeHandler, ((SearchIndex)handler).getContext()
+ .getIndexDirectory(), rsyncPort, rsyncEntryName, rsyncEntryPath);
+ }
+ else
+ {
+ return new JBossCacheIndexInfos(rootFqn, cache, system, modeHandler);
+ }
+
+ }
+
protected Log getLogger()
{
return log;
@@ -202,9 +253,9 @@
return true;
}
- /**
- * {@inheritDoc}
- */
+ /**
+ * {@inheritDoc}
+ */
@Override
public void close()
{
@@ -222,8 +273,9 @@
});
}
}
- catch (Exception e) {
+ catch (Exception e)
+ {
log.warn("Not all JBoss Cache MBeans were unregistered.");
}
- }
+ }
}
Modified: jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexInfos.java
===================================================================
--- jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexInfos.java 2012-01-13 09:39:01 UTC (rev 5450)
+++ jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexInfos.java 2012-01-13 15:23:33 UTC (rev 5451)
@@ -64,17 +64,17 @@
private static final String LIST_KEY = "$listOfIndexes".intern();
- private final Cache<Serializable, Object> cache;
+ protected final Cache<Serializable, Object> cache;
/**
* Flag notifies if this IndexInfos is from system search manager or not.
*/
- private boolean system;
+ protected boolean system;
/**
* Used to retrieve the current mode
*/
- private final IndexerIoModeHandler modeHandler;
+ protected final IndexerIoModeHandler modeHandler;
/**
* This FQN points to cache node, where list of indexes for this {@link IndexInfos} instance is stored.
@@ -192,23 +192,36 @@
// read from cache to update lists
set = (Set<String>)cache.get(namesFqn, LIST_KEY);
}
- if (set != null)
+ refreshIndexes(set);
+ }
+ }
+
+ /**
+ * Update index configuration, when it changes on persistent storage
+ *
+ * @param set
+ */
+ protected void refreshIndexes(Set<String> set)
+ {
+ // do nothing if null is passed
+ if (set == null)
+ {
+ return;
+ }
+ setNames(set);
+ // callback multiIndex to refresh lists
+ try
+ {
+ MultiIndex multiIndex = getMultiIndex();
+ if (multiIndex != null)
{
- setNames(set);
- // callback multiIndex to refresh lists
- try
- {
- MultiIndex multiIndex = getMultiIndex();
- if (multiIndex != null)
- {
- multiIndex.refreshIndexList();
- }
- }
- catch (IOException e)
- {
- log.error("Failed to update indexes! " + e.getMessage(), e);
- }
+ multiIndex.refreshIndexList();
}
}
+ catch (IOException e)
+ {
+ log.error("Failed to update indexes! " + e.getMessage(), e);
+ }
}
+
}
Added: jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/RsyncIndexInfos.java
===================================================================
--- jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/RsyncIndexInfos.java (rev 0)
+++ jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/RsyncIndexInfos.java 2012-01-13 15:23:33 UTC (rev 5451)
@@ -0,0 +1,186 @@
+/*
+ * Copyright (C) 2003-2012 eXo Platform SAS.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation; either version 3
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, see<http://www.gnu.org/licenses/>.
+ */
+package org.exoplatform.services.jcr.impl.core.query.jbosscache;
+
+import org.exoplatform.services.jcr.config.RepositoryConfigurationException;
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoMode;
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoModeHandler;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jgroups.stack.IpAddress;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Created by The eXo Platform SAS
+ * Author : eXoPlatform
+ * exo at exoplatform.com
+ * Jan 11, 2012
+ */
+ at CacheListener
+public class RsyncIndexInfos extends JBossCacheIndexInfos
+{
+
+ private final Log log = ExoLogger.getLogger("exo.jcr.component.core.RsyncIndexInfos");
+
+ private final String indexPath;
+
+ private final String urlFormatString;
+
+ public RsyncIndexInfos(Fqn<String> rootFqn, Cache<Serializable, Object> cache, boolean system,
+ IndexerIoModeHandler modeHandler, String indexPath, int rsyncPort, String rsyncEntryName, String rsyncEntryPath)
+ throws RepositoryConfigurationException
+ {
+ super(rootFqn, cache, system, modeHandler);
+
+ String absoluteRsyncEntryPath;
+ try
+ {
+ this.indexPath = new File(indexPath).getCanonicalPath();
+ absoluteRsyncEntryPath = new File(rsyncEntryPath).getCanonicalPath();
+ }
+ catch (IOException e)
+ {
+ throw new RepositoryConfigurationException("Index path or rsyncEntry path is invalid.", e);
+ }
+
+ if (this.indexPath.startsWith(absoluteRsyncEntryPath))
+ {
+ // in relation to RSync Server Entry
+ // i.e. absolute index path is /var/portal/data/index/repo1/ws2
+ // i.e. RSync Server Entry is "index" pointing to /var/portal/data/index
+ // then relative path is repo1/ws2
+ // and whole url is "rsync://<addr>:<port>/<entryName>/repo1/ws2"
+ String relativeIndexPath = this.indexPath.substring(absoluteRsyncEntryPath.length());
+
+ // if client is Windows OS, need to replace all '\' with '/' used in url
+ if (File.separatorChar == '\\')
+ {
+ relativeIndexPath = relativeIndexPath.replace(File.separatorChar, '/');
+ }
+ // generate ready-to-use formatter string with address variable
+ urlFormatString = "rsync://%s:" + rsyncPort + "/" + rsyncEntryName + relativeIndexPath + "/";
+ }
+ else
+ {
+ throw new RepositoryConfigurationException(
+ "Invalid RSync configuration. Index must be placed in folder that is a descendant of RSync Server Entry. Current RSync Server Entry Path is : "
+ + absoluteRsyncEntryPath
+ + " but it doesnt hold Index folder, that is : "
+ + this.indexPath
+ + ". Please fix configuration according to JCR Documentation and restart application.");
+ }
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void refreshIndexes(Set<String> set)
+ {
+ // Call RSync to retrieve actual index from coordinator
+ if (modeHandler.getMode() == IndexerIoMode.READ_ONLY)
+ {
+ if (cache.getLocalAddress() instanceof IpAddress)
+ {
+ // Coordinator's address
+ String address =
+ ((IpAddress)((CacheSPI)cache).getRPCManager().getCoordinator()).getIpAddress().getHostAddress();
+ RSyncJob rSyncJob = new RSyncJob(String.format(urlFormatString, address), indexPath);
+ try
+ {
+ synchronized (this)
+ {
+ rSyncJob.execute();
+ }
+ }
+ catch (IOException e)
+ {
+ log.error("Failed to retrieve index using RSYNC", e);
+ }
+ }
+ }
+ // call super, after indexes are synchronized
+ super.refreshIndexes(set);
+ }
+
+ private class RSyncJob
+ {
+ private Process process;
+
+ private String src;
+
+ private String dst;
+
+ public RSyncJob(String src, String dst)
+ {
+ this.src = src.endsWith(File.separator) ? src : src + File.separator;
+ this.dst = dst;
+ }
+
+ public void execute() throws IOException
+ {
+ Runtime run = Runtime.getRuntime();
+ try
+ {
+ String command = "rsync -rv --delete " + src + " " + dst;
+ log.info("Rsync job started: " + command);
+ process = run.exec(command);
+
+ Integer returnCode = null;
+ // wait for thread
+ while (returnCode == null)
+ {
+ try
+ {
+ returnCode = process.waitFor();
+ }
+ catch (InterruptedException e)
+ {
+ // oops, this can happen sometimes
+ }
+ }
+ log.info("Rsync job finished: " + returnCode);
+ if (returnCode != 0)
+ {
+ throw new IOException("RSync job finished with exit code is " + returnCode);
+ }
+ }
+ finally
+ {
+ process = null;
+ }
+ }
+
+ public void forceCancel()
+ {
+ if (process != null)
+ {
+ process.destroy();
+ }
+ }
+ }
+
+}
Property changes on: jcr/branches/1.14-RSYNC/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/RsyncIndexInfos.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
More information about the exo-jcr-commits
mailing list