`
kanpiaoxue
  • 浏览: 1746663 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

分级事件驱动架构SEDA( Staged Event Driven Architecture)的构造类

 
阅读更多
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.log4j.Logger;

/**
 * <pre>
 * 分级事件驱动架构SEDA( Staged E ventDriven A rch itecture)的构造类
 * @author kanpiaoxue
 * @param <Task> SEDAThreadPoolUtils内置的 interface Task
 * 2013-11-07
 * </pre>
 */
public final class SEDAThreadPool {
	private static final Logger LOGGER = Logger.getLogger(SEDAThreadPool.class);
	private final BlockingQueue<SEDAThreadPool.Task> inputQueue;
	private final BlockingQueue<SEDAThreadPool.Task> outputQueue;
	private final ExecutorService exec;
	private final String threadName;
	private final int threadPoolSize;
	private final ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task> conflictMap;
	private volatile boolean hasOutput;
	private volatile boolean hasConflictMap;

	/**
	 * <pre>
	 * @param threadName 工作线程的名称
	 * @param inputQueue 输入任务的队列
	 * @param outputQueue 输出任务的队列。当不存在输出队列是,该项可以为 null
	 * @param exec 执行的线程池。如果该项为 null,则采用默认的 Executors.newCachedThreadPool()
	 * @param threadPoolSize 线程池的大小
	 * </pre>
	 */
	public SEDAThreadPool(String threadName, BlockingQueue<Task> inputQueue,
			BlockingQueue<Task> outputQueue, ExecutorService exec,
			int threadPoolSize) {
		this(threadName, inputQueue, outputQueue, exec, threadPoolSize, null);
	}

	/**
	 * <pre>
	 * @param threadName 工作线程的名称
	 * @param inputQueue 输入任务的队列
	 * @param exec 执行的线程池。如果该项为 null,则采用默认的 Executors.newCachedThreadPool()
	 * @param threadPoolSize 线程池的大小
	 * </pre>
	 */
	public SEDAThreadPool(String threadName, BlockingQueue<Task> inputQueue,
			ExecutorService exec, int threadPoolSize) {
		this(threadName, inputQueue, null, exec, threadPoolSize, null);
	}

	/**
	 * <pre>
	 * @param threadName 工作线程的名称
	 * @param inputQueue 输入任务的队列
	 * @param exec 执行的线程池。如果该项为 null,则采用默认的 Executors.newCachedThreadPool()
	 * @param threadPoolSize 线程池的大小
	 * @param conflictMap Task发生冲突Map :和 conflictMap 中的key存在冲突的任务都不会被执行
	 * </pre>
	 */
	public SEDAThreadPool(
			String threadName,
			BlockingQueue<Task> inputQueue,
			ExecutorService exec,
			int threadPoolSize,
			ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task> conflictMap) {
		this(threadName, inputQueue, null, exec, threadPoolSize, conflictMap);
	}

	/**
	 * <pre>
	 * @param threadName 工作线程的名称
	 * @param inputQueue 输入任务的队列
	 * @param outputQueue 输出任务的队列。当不存在输出队列是,该项可以为 null
	 * @param exec 执行的线程池。如果该项为 null,则采用默认的 Executors.newCachedThreadPool()
	 * @param threadPoolSize 线程池的大小
	 * @param conflictMap Task发生冲突Map :和 conflictMap 中的key存在冲突的任务都不会被执行
	 * </pre>
	 */
	public SEDAThreadPool(
			String threadName,
			BlockingQueue<SEDAThreadPool.Task> inputQueue,
			BlockingQueue<SEDAThreadPool.Task> outputQueue,
			ExecutorService exec,
			int threadPoolSize,
			ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task> conflictMap) {
		super();
		this.threadName = threadName;
		this.inputQueue = inputQueue;
		this.outputQueue = outputQueue;
		this.exec = (null == exec) ? Executors.newCachedThreadPool() : exec;
		this.threadPoolSize = threadPoolSize;
		this.conflictMap = conflictMap;
		if (null != conflictMap) {
			hasConflictMap = true;
		}
		this.hasOutput = (null != outputQueue);

	}

	/**
	 * <pre>
	 * 创建SEDA线程池
	 * </pre>
	 */
	public void createSEDAThreadPool() {
		for (int i = 0; i < threadPoolSize; i++) {
			exec.execute(new InnerConsumer(threadName + i, inputQueue,
					outputQueue));
		}
	}

	private class InnerConsumer implements Runnable {
		private final String name;
		private final BlockingQueue<SEDAThreadPool.Task> inputQueue;
		private final BlockingQueue<SEDAThreadPool.Task> outputQueue;

		public InnerConsumer(String name,
				BlockingQueue<SEDAThreadPool.Task> inputQueue,
				BlockingQueue<SEDAThreadPool.Task> outputQueue) {
			super();
			this.name = name;
			this.inputQueue = inputQueue;
			this.outputQueue = outputQueue;
		}

		@Override
		public void run() {
			Thread.currentThread().setName(name);
			if (LOGGER.isInfoEnabled()) {
				LOGGER.info(name + " start to work.");
			}
			while (true) {
				try {
					SEDAThreadPool.Task inputTask = inputQueue.take();
					try {
						if (hasConflictMap) {
							if (null != conflictMap.putIfAbsent(inputTask,
									inputTask)) {
								if (LOGGER.isDebugEnabled()) {
									LOGGER.debug(inputTask
											+ " has found in conflictMap, it is abandoned.");
								}
								continue;
							} else {
								if (LOGGER.isDebugEnabled()) {
									LOGGER.debug(inputTask
											+ " is put into conflictMap.");
								}
							}
						}
						SEDAThreadPool.Task outputTask = consume(inputTask);
						if (hasOutput && null != outputTask) {
							outputQueue.put(outputTask);
							if (LOGGER.isDebugEnabled()) {
								LOGGER.debug(outputTask
										+ " put into outputQueue.");
							}
						}
					} finally {
						if (hasConflictMap
								&& null != conflictMap.remove(inputTask)) {
							if (LOGGER.isDebugEnabled()) {
								LOGGER.debug(inputTask
										+ " is removed from conflictMap.");
							}
						}
					}
				} catch (Exception e) {
					LOGGER.error("Error:" + e.getMessage(), e);
				}
			}
		}

		private SEDAThreadPool.Task consume(SEDAThreadPool.Task task) {
			if (LOGGER.isDebugEnabled()) {
				LOGGER.debug(task + " takes from inputQueue.");
			}
			return task.execute() ? task : null;
		}
	}

	/**
	 * <pre>
	 * SEDA的Task接口
	 * @author kanpiaoxue
	 * </pre>
	 * 
	 */
	public interface Task {
		public boolean execute();
	}

}

 

 

 

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.wanmei.parallel.seda.SEDAThreadPool.Task;

public class Test {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		BlockingQueue<SEDAThreadPool.Task> inputQueue = new LinkedBlockingQueue<SEDAThreadPool.Task>();
		BlockingQueue<SEDAThreadPool.Task> inputQueue2 = new LinkedBlockingQueue<SEDAThreadPool.Task>();
		BlockingQueue<SEDAThreadPool.Task> inputQueue3 = new LinkedBlockingQueue<SEDAThreadPool.Task>();

		ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task> conflictMap = test();
		int threadPoolSize = Runtime.getRuntime().availableProcessors() * 4;

		ExecutorService exec = Executors.newCachedThreadPool();

		SEDAThreadPool pool = new SEDAThreadPool("consumer-1-", inputQueue,
				inputQueue2, exec, threadPoolSize, conflictMap);
		pool.createSEDAThreadPool();

		SEDAThreadPool pool2 = new SEDAThreadPool("consumer-2-", inputQueue2,
				inputQueue3, exec, threadPoolSize, conflictMap);
		pool2.createSEDAThreadPool();

		SEDAThreadPool pool3 = new SEDAThreadPool("consumer-3-", inputQueue3,
				exec, threadPoolSize, conflictMap);
		pool3.createSEDAThreadPool();

		exec.execute(new Test().new Producer("producer", inputQueue));

		exec.shutdown();

	}

	private static ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task> test() {
		ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task> conflictMap = new ConcurrentHashMap<SEDAThreadPool.Task, SEDAThreadPool.Task>();
		for (int i = 0; i < 10; i++) {
			Test.TaskImpl t = new Test().new TaskImpl("task-" + i);
			conflictMap.put(t, t);
		}
		return conflictMap;
	}

	private class Producer implements Runnable {
		private final String name;
		private final BlockingQueue<SEDAThreadPool.Task> outputQueue;

		public Producer(String name, BlockingQueue<Task> outputQueue) {
			super();
			this.name = name;
			this.outputQueue = outputQueue;
		}

		@Override
		public void run() {
			Thread.currentThread().setName(name);
			while (true) {
				try {
					for (SEDAThreadPool.Task t : getTasks()) {
						outputQueue.put(t);
					}
					TimeUnit.SECONDS.sleep(10);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}

		private List<SEDAThreadPool.Task> getTasks() {
			List<SEDAThreadPool.Task> list = new ArrayList<SEDAThreadPool.Task>();
			for (int i = 0; i < 10; i++) {
				list.add(new Test().new TaskImpl("task-" + i));
			}
			return list;
		}
	}

	private class TaskImpl implements SEDAThreadPool.Task {
		private String name;

		public TaskImpl(String name) {
			super();
			this.name = name;
		}

		@Override
		public int hashCode() {
			final int prime = 37;
			int result = 17;
			result = prime * result + getOuterType().hashCode();
			result = prime * result + ((name == null) ? 0 : name.hashCode());
			return result;
		}

		@Override
		public boolean equals(Object obj) {
			if (this == obj)
				return true;
			if (obj == null)
				return false;
			if (getClass() != obj.getClass())
				return false;
			TaskImpl other = (TaskImpl) obj;
			if (!getOuterType().equals(other.getOuterType()))
				return false;
			if (name == null) {
				if (other.name != null)
					return false;
			} else if (!name.equals(other.name))
				return false;
			return true;
		}

		@Override
		public String toString() {
			return "Task [name=" + name + "]";
		}

		@Override
		public boolean execute() {
			try {

				TimeUnit.SECONDS.sleep(10);
			} catch (Exception e) {
				e.printStackTrace();
			}
			return true;
		}

		private Test getOuterType() {
			return Test.this;
		}

	}
}

 

参考: http://www.eecs.harvard.edu/~mdw/proj/seda/

 

下面添加的这些代码,只是个人记录一下。代码的正确性有待验证,日期:2017/05/12。

代码基于JDK1.8

/**
 * <pre>
 * Copyright baidu.com CDC [2000-2017]
 * </pre>
 */
package org.kanpiaoxue.seda;

/**
 * <pre>
 * SEDAQueue.java
 * &#64;author kanpiaoxue<br>
 * &#64;version 1.0
 * Create Time 2017年5月12日 下午3:34:24<br>
 * Description : SEDAQueue接口
 * </pre>
 */
public interface SEDAQueue<E> {
    /**
     * <pre>
     * 放入元素,如果元素已存在则抛弃该元素
     * &#64;param e
     * &#64;throws InterruptedException
     * </pre>
     */
    void put(E e) throws InterruptedException;

    /**
     * <pre>
     * SEDAQueue 使用元素e开始工作
     * &#64;param work SEDAWork实例
     * &#64;throws InterruptedException
     * </pre>
     */
    void work(SEDAWork<E> work) throws InterruptedException;
}

 

 

/**
 * <pre>
 * Copyright baidu.com CDC [2000-2017]
 * </pre>
 */
package org.kanpiaoxue.seda;

/**
 * <pre>
 * SEDAStateMachineService.java
 * &#64;author kanpiaoxue<br>
 * &#64;version 1.0
 * Create Time 2017年5月12日 下午5:06:36<br>
 * Description : SEDA状态机服务
 * </pre>
 */
public interface SEDAStateMachineService<E> {
    /**
     * <pre>
     * &#64;param e 任务
     * &#64;return 是否正确的状态
     * </pre>
     */
    boolean isCorrectState(E e);
}

 

 

 

/**
 * <pre>
 * Copyright baidu.com CDC [2000-2017]
 * </pre>
 */
package org.kanpiaoxue.seda;

/**
 * <pre>
 * Work.java
 * &#64;author kanpiaoxue<br>
 * &#64;version 1.0
 * Create Time 2017年5月12日 下午4:45:39<br>
 * Description : SEDA的work接口
 * </pre>
 */
public interface SEDAWork<E> {
    /**
     * <pre>
     * 使用e开始工作
     * &#64;param e 工作元素
     * </pre>
     */
    void work(E e);
}

 

 

/**
 * <pre>
 * Copyright baidu.com CDC [2000-2017]
 * </pre>
 */
package org.kanpiaoxue.seda;

import org.kanpiaoxue.seda.impl.SEDAQueueWithStateMachineImpl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * <pre>
 * Test.java
 * &#64;author kanpiaoxue<br>
 * &#64;version 1.0
 * Create Time 2017年5月12日 下午3:54:39<br>
 * Description : 测试类
 * </pre>
 */
public class Test {
    private static final Logger LOGGER = LoggerFactory.getLogger(Test.class);

    public static void main(String[] args) throws Exception {

        final SEDAQueue<String> queue = new SEDAQueueWithStateMachineImpl<>(e -> {
            LOGGER.debug("{} was processed by stateMachine", e);
            return !e.contains("8");
        });
        ExecutorService e = Executors.newCachedThreadPool();
        e.execute(() -> {
            Thread.currentThread().setName("Producer-0");
            while (true) {
                try {
                    List<String> list = Lists.newArrayList();
                    for (int i = 1; i < 10; i++) {
                        String el = "hello" + i;
                        if (i % 7 == 0) {
                            el = "world";
                        }
                        list.add(el);
                    }

                    for (String el : list) {
                        queue.put(el);
                        LOGGER.info("{} was put into queue", el);
                    }

                    long sleepTime = 10L;
                    LOGGER.info("{} will sleep %s seconds", Thread.currentThread().getName(), sleepTime);
                    TimeUnit.SECONDS.sleep(sleepTime);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });

        for (int i = 0, j = 4; i < j; i++) {

            final int num = i;
            e.execute(() -> {
                Thread.currentThread().setName("Consumer-" + num);
                while (true) {
                    try {
                        queue.work(w -> {
                            LOGGER.info("[{}]{} was taken from queue", Thread.currentThread().getName(), w);

                            try {
                                TimeUnit.SECONDS.sleep(1L);
                            } catch (Exception e1) {
                                e1.printStackTrace();
                            }
                        });
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                }
            });
        }

        e.shutdown();
        e.awaitTermination(2, TimeUnit.DAYS);
    }
}

 

 

package org.kanpiaoxue.seda.impl;

import org.kanpiaoxue.seda.SEDAQueue;
import org.kanpiaoxue.seda.SEDAStateMachineService;
import org.kanpiaoxue.seda.SEDAWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * <pre>
 * SEDAQueue.java
 * &#64;author kanpiaoxue<br>
 * &#64;version 1.0
 * Create Time 2017年5月12日 下午2:55:24<br>
 * Description : SEDAQueue带有状态机服务的实现类
 * </pre>
 */
public class SEDAQueueWithStateMachineImpl<E> implements SEDAQueue<E> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SEDAQueueWithStateMachineImpl.class);
    private static final Object value = new Object();
    private final ConcurrentMap<E, Object> outerQueue = new ConcurrentHashMap<>();
    private final LinkedBlockingQueue<E> workingQueue = new LinkedBlockingQueue<>();
    private final SEDAStateMachineService<E> sedaStateMachineService;

    /**
     * <pre>
     * &#64;param sedaStateMachineService
     * </pre>
     */
    public SEDAQueueWithStateMachineImpl(SEDAStateMachineService<E> sedaStateMachineService) {
        super();
        Objects.requireNonNull(sedaStateMachineService, "sedaStateMachineService is null");
        this.sedaStateMachineService = sedaStateMachineService;
    }

    /*
     * (non-Javadoc)
     * @see org.kanpiaoxue.seda.SEDAQueue#put(java.lang.Object)
     */
    @Override
    public void put(E e) throws InterruptedException {
        Object returnValue = outerQueue.putIfAbsent(e, value);
        if (null != returnValue) {
            LOGGER.warn("{} was found in the queue. It was abandoned!", e);
            return;
        }

        boolean isCorrectState = sedaStateMachineService.isCorrectState(e);
        if (!isCorrectState) {
            LOGGER.warn("{}'s state is not correct. It was abandoned!", e);
            outerQueue.remove(e);
            return;
        }

        workingQueue.put(e);
    }

    /*
     * (non-Javadoc)
     * @see org.kanpiaoxue.seda.SEDAQueue#work(org.kanpiaoxue.seda.Work)
     */
    @Override
    public void work(SEDAWork<E> work) throws InterruptedException {
        E e = workingQueue.take();
        try {
            work.work(e);
        } finally {
            outerQueue.remove(e);
        }
    }
}

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics