[jboss-svn-commits] JBL Code SVN: r11099 - labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Apr 18 12:57:30 EDT 2007
Author: driedtoast
Date: 2007-04-18 12:57:30 -0400 (Wed, 18 Apr 2007)
New Revision: 11099
Added:
labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractListener.java
labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/HibernateGatewayListener.java
Log:
Added: labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractListener.java (rev 0)
+++ labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/AbstractListener.java 2007-04-18 16:57:30 UTC (rev 11099)
@@ -0,0 +1,236 @@
+package org.jboss.soa.esb.listeners.gateway;
+
+import java.lang.reflect.Method;
+import java.util.Collection;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.ListenerUtil;
+import org.jboss.soa.esb.listeners.RegistryUtil;
+import org.jboss.soa.esb.listeners.lifecycle.AbstractThreadedManagedLifecycle;
+import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
+import org.jboss.soa.esb.listeners.message.MessageComposer;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.RegistryException;
+
+/**
+ * Used to provide a base for creating listeners for the bus.
+ * Takes care of the composer, run and initialization methods
+ *
+ * @author danielmarchant
+ *
+ */
+public abstract class AbstractListener extends AbstractThreadedManagedLifecycle{
+
+ protected AbstractListener(ConfigTree config) throws ConfigurationException {
+ super(config);
+ initListener(config);
+ }
+
+
+ protected void initListener(ConfigTree config) throws ConfigurationException {
+
+ setTargetServiceCategory(ListenerUtil.obtainAtt(config,
+ ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null));
+ setTargetServiceName(ListenerUtil.obtainAtt(config,
+ ListenerTagNames.TARGET_SERVICE_NAME_TAG, null));
+
+ createComposer(config);
+
+ }
+
+ protected void createComposer(ConfigTree config) throws ConfigurationException
+ {
+ try
+ {
+ String sProcessMethod = null;
+ _composerName = config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+ if (null != _composerName)
+ { // class attribute
+ _composerClass = Class.forName(_composerName);
+ _composer = (MessageComposer)_composerClass.newInstance();
+ sProcessMethod = config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG, "compose");
+ }
+ else
+ {
+ sProcessMethod = createDefaultComposer(config);
+ _logger
+ .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG + "> element found in configuration" + " - Using default composer class : " + _composerName);
+ }
+
+ _processMethod = _composerClass.getMethod(sProcessMethod,
+ new Class[] { Object.class });
+ }
+ catch (Exception ex)
+ {
+ throw new ConfigurationException(ex);
+ }
+ } // ________________________________
+
+ /**
+ * Creates a default message composer and returns the default method name.
+ *
+ * @param config
+ * @return
+ */
+ protected abstract String createDefaultComposer(ConfigTree config);
+
+
+ protected abstract Object doPreSend();
+ protected void doPostSend(Message msg) {
+ // do nothing
+ }
+
+ @Override
+ protected void doRun() {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("run() method of " + this.getClass().getSimpleName() +
+ " started on thread " + Thread.currentThread().getName());
+ }
+ do {
+
+ try {
+
+ Object objToConvert = doPreSend();
+ Object obj = _processMethod.invoke(_composer,
+ new Object[]{objToConvert});
+ if (null == obj) {
+ _logger.warn("Action class method <" + _processMethod
+ .getName() + "> returned a null object");
+ continue;
+ }
+
+ boolean bSent = false;
+ try {
+
+ for (EPR current : _targetEprs) {
+ Courier _courier;
+ Message msg = (Message) obj;
+ _courier = CourierFactory.getCourier(current);
+ try {
+ if (bSent = _courier
+ .deliver((Message) obj)) {
+ break;
+ }
+ }
+ finally {
+ CourierUtil.cleanCourier(_courier);
+ doPostSend(msg);
+ }
+ }
+ } catch (CourierException e) {
+ _logger.error(" Courier is not found for target ", e);
+ } catch (MalformedEPRException e) {
+ _logger.error("Problems with the EPR ", e);
+ }
+
+ if (!bSent) {
+ String text = "Target service <" + _targetServiceCategory + "," + _targetServiceName + "> is not registered";
+ throw new Exception(text);
+ }
+
+ } catch (Exception e) {
+ _logger.error("Unexpected error has occurred",e);
+ }
+ } while (isRunning());
+ }
+
+
+
+
+
+
+ @Override
+ protected void doInitialise() throws ManagedLifecycleException {
+ try
+ {
+ _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,_targetServiceName);
+ if (null == _targetEprs || _targetEprs.size() < 1)
+ throw new ManagedLifecycleException("EPR <" + _targetServiceName + "> not found in registry") ;
+ }
+ catch (final RegistryException re)
+ {
+ throw new ManagedLifecycleException("Unexpected registry exception", re) ;
+ }
+
+ }
+
+ public String getTargetServiceCategory() {
+ return _targetServiceCategory;
+ }
+
+
+ public void setTargetServiceCategory(String serviceCategory) {
+ _targetServiceCategory = serviceCategory;
+ }
+
+
+ public String getTargetServiceName() {
+ return _targetServiceName;
+ }
+
+
+ public void setTargetServiceName(String serviceName) {
+ _targetServiceName = serviceName;
+ }
+
+
+ public MessageComposer getComposer() {
+ return _composer;
+ }
+
+
+ public void setComposer(MessageComposer _composer) {
+ this._composer = _composer;
+ }
+
+
+ public Class getComposerClass() {
+ return _composerClass;
+ }
+
+
+ public void setComposerClass(Class class1) {
+ _composerClass = class1;
+ }
+
+
+ public String getComposerName() {
+ return _composerName;
+ }
+
+
+ public void setComposerName(String name) {
+ _composerName = name;
+ }
+
+ public Collection<EPR> getTargetEprs() {
+ return _targetEprs;
+ }
+
+
+ public void setTargetEprs(Collection<EPR> eprs) {
+ _targetEprs = eprs;
+ }
+
+
+ protected Collection<EPR> _targetEprs;
+ protected String _targetServiceCategory, _targetServiceName;
+ protected String _composerName;
+ protected MessageComposer _composer;
+ protected Class _composerClass;
+ protected Method _processMethod;
+ private static final Logger _logger = Logger.getLogger(AbstractListener.class);
+
+
+
+}
Added: labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/HibernateGatewayListener.java
===================================================================
--- labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/HibernateGatewayListener.java (rev 0)
+++ labs/jbossesb/workspace/dmarchant/4.2m2/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/HibernateGatewayListener.java 2007-04-18 16:57:30 UTC (rev 11099)
@@ -0,0 +1,256 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.soa.esb.listeners.gateway;
+
+import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+import org.hibernate.HibernateException;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.event.AbstractEvent;
+import org.hibernate.event.DeleteEvent;
+import org.hibernate.event.DeleteEventListener;
+import org.hibernate.event.EventListeners;
+import org.hibernate.event.EventSource;
+import org.hibernate.event.LoadEvent;
+import org.hibernate.event.LoadEventListener;
+import org.hibernate.event.SaveOrUpdateEvent;
+import org.hibernate.event.SaveOrUpdateEventListener;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.message.AbstractMessageComposer;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.message.Message;
+
+/**
+ * Listens for events in hibernate and adds a message to the bus
+ * for processing.
+ *
+ *
+ * @author danielmarchant
+ *
+ */
+public class HibernateGatewayListener extends AbstractListener
+ implements DeleteEventListener,
+ SaveOrUpdateEventListener,
+ LoadEventListener {
+
+
+
+
+ protected HibernateGatewayListener(ConfigTree config) throws ConfigurationException {
+ super(config);
+ }
+
+
+ @Override
+ protected String createDefaultComposer(ConfigTree config) {
+ _composerName = HibernateMessageComposer.class.getName();
+ _composerClass = HibernateMessageComposer.class;
+ _composer = new HibernateMessageComposer();
+ return "compose";
+ }
+
+
+
+
+
+ @Override
+ protected Object doPreSend() {
+ try {
+ return queue.take();
+ } catch (InterruptedException e) {
+ _logger.warn(" interrupted while getting event off queue " , e);
+ return null;
+ }
+ }
+
+
+
+
+ @Override
+ protected void initListener(ConfigTree config) throws ConfigurationException {
+ // setup gateway as a listener to hibernate
+ Configuration cfg = new Configuration();
+ EventListeners listeners = cfg.getEventListeners();
+
+
+ // add as delete listener
+ DeleteEventListener[] dlisteners = listeners.getDeleteEventListeners();
+ if(dlisteners == null) {
+ dlisteners = new DeleteEventListener[1];
+ } else {
+ DeleteEventListener[] dlistenersTemp = new DeleteEventListener[dlisteners.length +1 ];
+ System.arraycopy(dlisteners, 0, dlistenersTemp, 0, dlisteners.length);
+ dlisteners = dlistenersTemp;
+ }
+ dlisteners[dlisteners.length-1] = this;
+ listeners.setDeleteEventListeners(dlisteners);
+
+ // add save or update event
+ SaveOrUpdateEventListener[] slisteners = listeners.getSaveEventListeners();
+ if(slisteners == null) {
+ slisteners = new SaveOrUpdateEventListener[1];
+ } else {
+ SaveOrUpdateEventListener[] slistenersTemp = new SaveOrUpdateEventListener[slisteners.length +1 ];
+ System.arraycopy(dlisteners, 0, slistenersTemp, 0, slisteners.length);
+ slisteners = slistenersTemp;
+ }
+ slisteners[slisteners.length-1] = this;
+ listeners.setSaveEventListeners(slisteners);
+
+ // add load event
+ LoadEventListener[] llisteners = listeners.getLoadEventListeners();
+ if(llisteners == null) {
+ llisteners = new LoadEventListener[1];
+ } else {
+ LoadEventListener[] llistenersTemp = new LoadEventListener[llisteners.length +1 ];
+ System.arraycopy(llisteners, 0, llistenersTemp, 0, llisteners.length);
+ llisteners = llistenersTemp;
+ }
+ llisteners[llisteners.length-1] = this;
+ listeners.setLoadEventListeners(llisteners);
+
+
+ // TODO post listeners
+ // TODO pre listeners
+ }
+
+
+
+
+
+
+
+ protected Logger _logger = Logger.getLogger(HibernateGatewayListener.class);
+ protected BlockingQueue<AbstractEvent> queue = new LinkedBlockingQueue<AbstractEvent>();
+
+ // Hibernate based event methods......
+
+
+ public void onDelete(DeleteEvent event) throws HibernateException {
+ if(_logger.isDebugEnabled()) {
+ _logger.debug(" Receiving delete event: " + event);
+ }
+ try {
+ queue.put(event);
+ } catch (InterruptedException e) {
+ _logger.info(" Event was interrupted onDelete on adding to queue",e);
+ }
+ }
+
+ public void onSaveOrUpdate(SaveOrUpdateEvent event) throws HibernateException {
+ if(_logger.isDebugEnabled()) {
+ _logger.debug(" Receiving save or update event: " + event);
+ }
+ try {
+ queue.put(event);
+ } catch (InterruptedException e) {
+ _logger.info(" Event was interrupted onSaveOrUpdate on adding to queue",e);
+ }
+ }
+
+ public void onLoad(LoadEvent event, LoadType type) throws HibernateException {
+ // TODO do something with the type : type.getName()
+ if(_logger.isDebugEnabled()) {
+ _logger.debug(" Receiving load event: " + event);
+ }
+ try {
+ queue.put(event);
+ } catch (InterruptedException e) {
+ _logger.info(" Event was interrupted onLoad on adding to queue",e);
+ }
+
+ }
+
+
+
+
+
+ /// hibernate message composer
+ public static class HibernateMessageComposer extends AbstractMessageComposer {
+
+ @Override
+ protected void populateMessage(Message message, Object obj) throws MessageDeliverException {
+ Object instanceObj = null;
+ String entityName = null;
+ Serializable entityId = null;
+ Object result = null;
+ EventSource source = null;
+ Serializable requestId = null;
+ String type = null;
+
+
+ if(obj instanceof DeleteEvent) {
+ DeleteEvent devent = (DeleteEvent)obj;
+ entityName = devent.getEntityName();
+ instanceObj = devent.getObject();
+ source = devent.getSession();
+ type = "delete";
+ } else if (obj instanceof SaveOrUpdateEvent) {
+ SaveOrUpdateEvent sevent = (SaveOrUpdateEvent) obj;
+ entityName = sevent.getEntityName();
+ instanceObj = sevent.getObject();
+ source = sevent.getSession();
+ entityId = sevent.getResultId();
+ requestId = sevent.getRequestedId();
+ type = "saveOrUpdate";
+ } else if (obj instanceof LoadEvent) {
+ LoadEvent levent = (LoadEvent)obj;
+ instanceObj = levent.getInstanceToLoad();
+ entityName = levent.getEntityClassName();
+ entityId = levent.getEntityId();
+ result = levent.getResult();
+ source = levent.getSession();
+ type = "load";
+ }
+
+
+
+ if(source != null)
+ message.getProperties().setProperty("eventSource",source);
+ if(instanceObj != null)
+ message.getProperties().setProperty("objectInstance",instanceObj);
+ if(entityId != null)
+ message.getProperties().setProperty("entityId",entityId);
+ if(requestId != null)
+ message.getProperties().setProperty("requestId",requestId);
+ if(entityName != null)
+ message.getProperties().setProperty("entityName",entityName);
+ if(type != null)
+ message.getProperties().setProperty("type",type);
+ }
+
+ public void setConfiguration(ConfigTree config) {
+ }
+
+ }
+
+
+
+
+
+
+
+}
More information about the jboss-svn-commits
mailing list