[infinispan-commits] Infinispan SVN: r2656 - trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Nov 3 05:03:56 EDT 2010


Author: NadirX
Date: 2010-11-03 05:03:55 -0400 (Wed, 03 Nov 2010)
New Revision: 2656

Modified:
   trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
   trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
Log:
Implement ISPN-757
Implement ISPN-758

Modified: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java	2010-11-02 18:32:46 UTC (rev 2655)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java	2010-11-03 09:03:55 UTC (rev 2656)
@@ -12,9 +12,10 @@
 import java.util.Map;
 import java.util.Set;
 
-import net.dataforte.cassandra.pool.ConnectionPool;
+import net.dataforte.cassandra.pool.DataSource;
 
 import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CassandraThriftDataSource;
 import org.apache.cassandra.thrift.Column;
 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 import org.apache.cassandra.thrift.ColumnParent;
@@ -60,12 +61,18 @@
 	private static final boolean trace = log.isTraceEnabled();
 
 	private CassandraCacheStoreConfig config;
+	
+	private CassandraThriftDataSource dataSource;
+	
+	private ConsistencyLevel readConsistencyLevel;
+	private ConsistencyLevel writeConsistencyLevel;
 
-	private ConnectionPool pool;
-
+	private String cacheName;
 	private ColumnPath entryColumnPath;
 	private ColumnParent entryColumnParent;
 	private ColumnParent expirationColumnParent;
+	private String entryKeyPrefix;
+	private String expirationKey;
 
 	static private byte emptyByteArray[] = {};
 
@@ -76,6 +83,7 @@
 	@Override
 	public void init(CacheLoaderConfig clc, Cache<?, ?> cache, StreamingMarshaller m) throws CacheLoaderException {
 		super.init(clc, cache, m);
+		this.cacheName = cache.getName();
 		this.config = (CassandraCacheStoreConfig) clc;
 	}
 
@@ -83,10 +91,14 @@
 	public void start() throws CacheLoaderException {
 
 		try {
-			pool = new ConnectionPool(config.getPoolProperties());
+			dataSource = new DataSource(config.getPoolProperties());
+			readConsistencyLevel = ConsistencyLevel.valueOf(config.readConsistencyLevel);
+			writeConsistencyLevel = ConsistencyLevel.valueOf(config.writeConsistencyLevel);
 			entryColumnPath = new ColumnPath(config.entryColumnFamily).setColumn(ENTRY_COLUMN_NAME.getBytes("UTF-8"));
 			entryColumnParent = new ColumnParent(config.entryColumnFamily);
-			expirationColumnParent = new ColumnParent(config.expirationColumnFamily);			
+			entryKeyPrefix = ENTRY_KEY_PREFIX+(config.isSharedKeyspace()?cacheName+"_":"");
+			expirationColumnParent = new ColumnParent(config.expirationColumnFamily);
+			expirationKey = EXPIRATION_KEY+(config.isSharedKeyspace()?"_"+cacheName:"");
 		} catch (Exception e) {
 			throw new ConfigurationException(e);
 		}
@@ -100,11 +112,11 @@
 
 	@Override
 	public InternalCacheEntry load(Object key) throws CacheLoaderException {
-		String hashKey = CassandraCacheStore.hashKey(key);
+		String hashKey = hashKey(key);
 		Cassandra.Client cassandraClient = null;
 		try {
-			cassandraClient = pool.getConnection();
-			ColumnOrSuperColumn column = cassandraClient.get(config.keySpace, hashKey, entryColumnPath, ConsistencyLevel.ONE);
+			cassandraClient = dataSource.getConnection();
+			ColumnOrSuperColumn column = cassandraClient.get(config.keySpace, hashKey, entryColumnPath, readConsistencyLevel);
 			InternalCacheEntry ice = unmarshall(column.getColumn().getValue(), key);
 			if (ice != null && ice.isExpired()) {
 				remove(key);
@@ -117,7 +129,7 @@
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
 		} finally {
-			pool.release(cassandraClient);
+			dataSource.releaseConnection(cassandraClient);
 		}
 	}
 
@@ -130,7 +142,7 @@
 	public Set<InternalCacheEntry> load(int numEntries) throws CacheLoaderException {
 		Cassandra.Client cassandraClient = null;
 		try {
-			cassandraClient = pool.getConnection();
+			cassandraClient = dataSource.getConnection();
 			Set<InternalCacheEntry> s = new HashSet<InternalCacheEntry>();
 			SlicePredicate slicePredicate = new SlicePredicate();
 			slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
@@ -142,7 +154,7 @@
 				KeyRange keyRange = new KeyRange(sliceSize);
 				keyRange.setStart_token(startKey);
 				keyRange.setEnd_token("");
-				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);				
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, readConsistencyLevel);				
 
 				// Cycle through all the keys
 				for (KeySlice keySlice : keySlices) {
@@ -183,7 +195,7 @@
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
 		} finally {
-			pool.release(cassandraClient);
+			dataSource.releaseConnection(cassandraClient);
 		}
 	}
 
@@ -191,7 +203,7 @@
 	public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
 		Cassandra.Client cassandraClient = null;
 		try {
-			cassandraClient = pool.getConnection();
+			cassandraClient = dataSource.getConnection();
 			Set<Object> s = new HashSet<Object>();
 			SlicePredicate slicePredicate = new SlicePredicate();
 			slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
@@ -202,7 +214,7 @@
 				KeyRange keyRange = new KeyRange(SLICE_SIZE);
 				keyRange.setStart_token(startKey);
 				keyRange.setEnd_token("");
-				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, readConsistencyLevel);
 				if (keySlices.size() < SLICE_SIZE) {
 					complete = true;
 				} else {
@@ -221,7 +233,7 @@
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
 		} finally {
-			pool.release(cassandraClient);
+			dataSource.releaseConnection(cassandraClient);
 		}
 	}
 
@@ -231,14 +243,14 @@
 	 */
 	@Override
 	public void stop() {
-		pool.close();
+		
 	}
 
 	@Override
 	public void clear() throws CacheLoaderException {
 		Cassandra.Client cassandraClient = null;
 		try {
-			cassandraClient = pool.getConnection();
+			cassandraClient = dataSource.getConnection();
 			SlicePredicate slicePredicate = new SlicePredicate();
 			slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
 			String startKey = "";
@@ -248,7 +260,7 @@
 				KeyRange keyRange = new KeyRange(SLICE_SIZE);
 				keyRange.setStart_token(startKey);
 				keyRange.setEnd_token("");
-				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, readConsistencyLevel);
 				if (keySlices.size() < SLICE_SIZE) {
 					complete = true;
 				} else {
@@ -265,7 +277,7 @@
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
 		} finally {
-			pool.release(cassandraClient);
+			dataSource.releaseConnection(cassandraClient);
 		}
 
 	}
@@ -276,16 +288,16 @@
 			log.trace("remove(\"{0}\") ", key);
 		Cassandra.Client cassandraClient = null;
 		try {
-			cassandraClient = pool.getConnection();
+			cassandraClient = dataSource.getConnection();
 			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
-			remove0(CassandraCacheStore.hashKey(key), mutationMap);
-			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+			remove0(hashKey(key), mutationMap);
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
 			return true;
 		} catch (Exception e) {
 			log.error("Exception while removing " + key, e);
 			return false;
 		} finally {
-			pool.release(cassandraClient);
+			dataSource.releaseConnection(cassandraClient);
 		}
 	}
 
@@ -309,15 +321,15 @@
 		Cassandra.Client cassandraClient = null;
 
 		try {
-			cassandraClient = pool.getConnection();
+			cassandraClient = dataSource.getConnection();
 			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>(2);
 			store0(entry, mutationMap);
 			
-			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
 		} finally {
-			pool.release(cassandraClient);
+			dataSource.releaseConnection(cassandraClient);
 		}
 	}
 
@@ -325,7 +337,7 @@
 		Object key = entry.getKey();
 		if (trace)
 			log.trace("store(\"{0}\") ", key);
-		String cassandraKey = CassandraCacheStore.hashKey(key);
+		String cassandraKey = hashKey(key);
       try {
          addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), marshall(entry));
          if (entry.canExpire()) {
@@ -339,7 +351,7 @@
 	
 	private void addExpiryEntry(String cassandraKey, long expiryTime, Map<String, Map<String, List<Mutation>>> mutationMap) {		
 		try {
-			addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, longToBytes(expiryTime), cassandraKey.getBytes("UTF-8"), emptyByteArray);
+			addMutation(mutationMap, expirationKey, config.expirationColumnFamily, longToBytes(expiryTime), cassandraKey.getBytes("UTF-8"), emptyByteArray);
 		} catch (Exception e) {
 			// Should not happen
 		}
@@ -389,7 +401,7 @@
 
 	/**
 	 * Purge expired entries.
-	 * Expiration entries are stored in a single key (EXPIRATION_KEY) within a specific ColumnFamily (set by configuration).
+	 * Expiration entries are stored in a single key (expirationKey) within a specific ColumnFamily (set by configuration).
 	 * The entries are grouped by expiration timestamp in SuperColumns within which each entry's key is mapped to a column
 	 */
 	@Override
@@ -398,14 +410,14 @@
 			log.trace("purgeInternal");
 		Cassandra.Client cassandraClient = null;
 		try {
-			cassandraClient = pool.getConnection();
+			cassandraClient = dataSource.getConnection();
 			// We need to get all supercolumns from the beginning of time until now, in SLICE_SIZE chunks
 			SlicePredicate predicate = new SlicePredicate();
 			predicate.setSlice_range(new SliceRange(emptyByteArray, longToBytes(System.currentTimeMillis()), false, SLICE_SIZE));
 			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
 			for(boolean complete=false; !complete; ) {
 				// Get all columns
-				List<ColumnOrSuperColumn> slice = cassandraClient.get_slice(config.keySpace, EXPIRATION_KEY, expirationColumnParent, predicate, ConsistencyLevel.ONE);
+				List<ColumnOrSuperColumn> slice = cassandraClient.get_slice(config.keySpace, expirationKey, expirationColumnParent, predicate, readConsistencyLevel);
 				complete = slice.size() < SLICE_SIZE;
 				// Delete all keys returned by the slice
 				for(ColumnOrSuperColumn crumb : slice) {
@@ -416,14 +428,14 @@
 						remove0(new String(col.getName(), "UTF-8"), mutationMap);
 					}
 					// Remove the expiration supercolumn
-					addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, scol.getName(), null, null);
+					addMutation(mutationMap, expirationKey, config.expirationColumnFamily, scol.getName(), null, null);
 				}				
 			}
-			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
 		} finally {
-			pool.release(cassandraClient);
+			dataSource.releaseConnection(cassandraClient);
 		}
 
 	}
@@ -433,7 +445,7 @@
 		Cassandra.Client cassandraClient = null;
 
 		try {
-			cassandraClient = pool.getConnection();
+			cassandraClient = dataSource.getConnection();
 			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
 
 			for (Modification m : mods) {
@@ -452,11 +464,11 @@
 				}
 			}
 
-			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
 		} finally {
-			pool.release(cassandraClient);
+			dataSource.releaseConnection(cassandraClient);
 		}
 
 	}
@@ -466,13 +478,13 @@
 		return "CassandraCacheStore";
 	}
 
-	private static String hashKey(Object key) {
-		return ENTRY_KEY_PREFIX + key.toString();
+	private String hashKey(Object key) {
+		return entryKeyPrefix + key.toString();
 	}
 
-	private static String unhashKey(String key) {
-		if(key.startsWith(ENTRY_KEY_PREFIX))
-			return key.substring(ENTRY_KEY_PREFIX.length());
+	private String unhashKey(String key) {
+		if(key.startsWith(entryKeyPrefix))
+			return key.substring(entryKeyPrefix.length());
 		else
 			return null;
 	}

Modified: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java	2010-11-02 18:32:46 UTC (rev 2655)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java	2010-11-03 09:03:55 UTC (rev 2656)
@@ -8,8 +8,6 @@
  * Configures {@link CassandraCacheStore}.
  */
 public class CassandraCacheStoreConfig extends LockSupportCacheStoreConfig {
-	
-	
 
 	/**
 	 * @configRef desc="The Cassandra keyspace"
@@ -25,8 +23,17 @@
 	 * @configRef desc="The Cassandra column family for expirations"
 	 */
 	String expirationColumnFamily = "InfinispanExpiration";
+	
+	/**
+	 * @configRef desc="Whether the keySpace is shared between multiple caches"
+	 */
+	boolean sharedKeyspace = false;
+	
+	String readConsistencyLevel = "ONE";
+	
+	String writeConsistencyLevel = "ONE";
 
-	PoolProperties poolProperties;
+	protected PoolProperties poolProperties;
 
 	public CassandraCacheStoreConfig() {
 		setCacheLoaderClassName(CassandraCacheStore.class.getName());
@@ -57,6 +64,30 @@
 		this.expirationColumnFamily = expirationColumnFamily;
 	}
 
+	public boolean isSharedKeyspace() {
+		return sharedKeyspace;
+	}
+
+	public void setSharedKeyspace(boolean sharedKeyspace) {
+		this.sharedKeyspace = sharedKeyspace;
+	}
+
+	public String getReadConsistencyLevel() {
+		return readConsistencyLevel;
+	}
+
+	public void setReadConsistencyLevel(String readConsistencyLevel) {
+		this.readConsistencyLevel = readConsistencyLevel;
+	}
+
+	public String getWriteConsistencyLevel() {
+		return writeConsistencyLevel;
+	}
+
+	public void setWriteConsistencyLevel(String writeConsistencyLevel) {
+		this.writeConsistencyLevel = writeConsistencyLevel;
+	}
+
 	public PoolProperties getPoolProperties() {
 		return poolProperties;
 	}
@@ -77,258 +108,37 @@
 		return poolProperties.getPort();
 	}
 
-	
-	public int getAbandonWhenPercentageFull() {
-		return poolProperties.getAbandonWhenPercentageFull();
-	}
-
-	
 	public boolean isFramed() {
 		return poolProperties.isFramed();
 	}
 
-	
-	public int getInitialSize() {
-		return poolProperties.getInitialSize();
-	}
-
-	
-	public int getMaxActive() {
-		return poolProperties.getMaxActive();
-	}
-
-	
-	public long getMaxAge() {
-		return poolProperties.getMaxAge();
-	}
-
-	
-	public int getMaxIdle() {
-		return poolProperties.getMaxIdle();
-	}
-
-	
-	public int getMaxWait() {
-		return poolProperties.getMaxWait();
-	}
-
-	
-	public int getMinEvictableIdleTimeMillis() {
-		return poolProperties.getMinEvictableIdleTimeMillis();
-	}
-
-	
-	public int getMinIdle() {
-		return poolProperties.getMinIdle();
-	}
-
-	
-	public String getName() {
-		return poolProperties.getName();
-	}
-
-	
-	public int getNumTestsPerEvictionRun() {
-		return poolProperties.getNumTestsPerEvictionRun();
-	}
-
-	
 	public String getPassword() {
 		return poolProperties.getPassword();
 	}
-	
-	public int getRemoveAbandonedTimeout() {
-		return poolProperties.getRemoveAbandonedTimeout();
-	}
 
-	
-	public int getSuspectTimeout() {
-		return poolProperties.getSuspectTimeout();
-	}
-
-	
-	public int getTimeBetweenEvictionRunsMillis() {
-		return poolProperties.getTimeBetweenEvictionRunsMillis();
-	}
-
-	
-	public boolean getUseLock() {
-		return poolProperties.getUseLock();
-	}
-
-	
 	public String getUsername() {
 		return poolProperties.getUsername();
 	}
 
-	
-	public long getValidationInterval() {
-		return poolProperties.getValidationInterval();
-	}
-	
-	public boolean isFairQueue() {
-		return poolProperties.isFairQueue();
-	}
-
-	
-	public boolean isJmxEnabled() {
-		return poolProperties.isJmxEnabled();
-	}
-
-	
-	public boolean isLogAbandoned() {
-		return poolProperties.isLogAbandoned();
-	}
-
-	
-	public boolean isRemoveAbandoned() {
-		return poolProperties.isRemoveAbandoned();
-	}
-
-	
-	public boolean isTestOnBorrow() {
-		return poolProperties.isTestOnBorrow();
-	}
-
-	
-	public boolean isTestOnConnect() {
-		return poolProperties.isTestOnConnect();
-	}
-
-	
-	public boolean isTestOnReturn() {
-		return poolProperties.isTestOnReturn();
-	}
-
-	
-	public boolean isTestWhileIdle() {
-		return poolProperties.isTestWhileIdle();
-	}
-
-	
-	public void setAbandonWhenPercentageFull(int percentage) {
-		poolProperties.setAbandonWhenPercentageFull(percentage);
-
-	}
-	
-	public void setFairQueue(boolean fairQueue) {
-		poolProperties.setFairQueue(fairQueue);
-
-	}
-
-	
 	public void setFramed(boolean framed) {
 		poolProperties.setFramed(framed);
 
 	}
 
-	
-	public void setInitialSize(int initialSize) {
-		poolProperties.setInitialSize(initialSize);
-
-	}
-
-	
-	public void setJmxEnabled(boolean jmxEnabled) {
-		poolProperties.setJmxEnabled(jmxEnabled);
-	}
-
-	
-	public void setLogAbandoned(boolean logAbandoned) {
-		poolProperties.setLogAbandoned(logAbandoned);
-	}
-
-	
-	public void setMaxActive(int maxActive) {
-		poolProperties.setMaxActive(maxActive);
-
-	}
-
-	
-	public void setMaxAge(long maxAge) {
-		poolProperties.setMaxAge(maxAge);
-
-	}
-
-	
-	public void setMaxIdle(int maxIdle) {
-		poolProperties.setMaxIdle(maxIdle);
-
-	}
-
-	
-	public void setMaxWait(int maxWait) {
-		poolProperties.setMaxWait(maxWait);
-
-	}
-
-	
-	public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
-		poolProperties.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
-
-	}
-
-	public void setMinIdle(int minIdle) {
-		poolProperties.setMinIdle(minIdle);
-
-	}
-
-	public void setName(String name) {
-		poolProperties.setName(name);
-	}
-
-	public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
-		poolProperties.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
-
-	}
-
 	public void setPassword(String password) {
 		poolProperties.setPassword(password);
 	}
 
-	public void setRemoveAbandoned(boolean removeAbandoned) {
-		poolProperties.setRemoveAbandoned(removeAbandoned);
+	public void setUsername(String username) {
+		poolProperties.setUsername(username);
 	}
-
 	
-	public void setRemoveAbandonedTimeout(int removeAbandonedTimeout) {
-		poolProperties.setRemoveAbandonedTimeout(removeAbandonedTimeout);
-
+	public void setDatasourceJndiLocation(String location) {
+		poolProperties.setDataSourceJNDI(location);
 	}
-
-	public void setSuspectTimeout(int seconds) {
-		poolProperties.setSuspectTimeout(seconds);
-
+	
+	public String getDatasourceJndiLocation() {
+		return poolProperties.getDataSourceJNDI();
 	}
 
-	public void setTestOnBorrow(boolean testOnBorrow) {
-		poolProperties.setTestOnBorrow(testOnBorrow);
-
-	}
-
-	public void setTestOnConnect(boolean testOnConnect) {
-		poolProperties.setTestOnConnect(testOnConnect);
-
-	}
-
-	public void setTestOnReturn(boolean testOnReturn) {
-		poolProperties.setTestOnReturn(testOnReturn);
-	}
-
-	public void setTestWhileIdle(boolean testWhileIdle) {
-		poolProperties.setTestWhileIdle(testWhileIdle);
-	}
-
-	public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
-		poolProperties.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
-
-	}
-
-	public void setUsername(String username) {
-		poolProperties.setUsername(username);
-	}
-
-	public void setValidationInterval(long validationInterval) {
-		poolProperties.setValidationInterval(validationInterval);
-	}
 }



More information about the infinispan-commits mailing list