Dispatcher生存在一个独立的线程中并且是所有并发Job的容器。每一批处理在两种条件下会执行,不考虑先后:
1、 自上一次这种条件下执行后mIntervalTime毫秒并且至少有一个Job在批次中。
2、 批次的大小达到mJobBatchMaxSize
如果mJobBatchMaxSize值设为1,TrainServlet就退化为ClassicServlet。这是一个动态批处理的重要标识,这会在稍后解释。
方案的最后部分是Worker类:
//Worker.java
package train;
import java.util.*;
import java.sql.*;
public class Worker
implements Runnable {
List mJobs;
Map mJobMap; //Helper member for mapping the jobs
protected Worker(List jobs) {
mJobs = jobs;
mJobMap = new HashMap();
}
private void Process() {
boolean isError = false;
StringBuffer sqlBuff = new StringBuffer(
"Select id,name ,descr,views from trentry where id in ");
StringBuffer whereClause = CreateWhereClause(); //
sqlBuff.append(whereClause);
/* Now SQL statement is fully formed and looks like:
Select id,name ,descr,views from trentry where id in (3343,22222,5555).
This will allow us to fetch several user requests in one shot */
Connection connection = null;
Statement stmt = null;
try {
connection = Util.getDBConnection();
stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sqlBuff.toString());
System.out.println(sqlBuff.toString());
Map result = new HashMap();
while (rs.next()) {
int id = rs.getInt("ID");
String name = rs.getString("NAME");
String descr = rs.getString("DESCR");
int views = rs.getInt("VIEWS");
Job job = (Job) mJobMap.get(String.valueOf(id));
//Populate instance of the Job with data retrieved from database
job.mName = name;
job.mDescr = descr;
job.mViews = views;
}
String sqlViewStr = "update trentry set views = views+1 where id in " +
whereClause;
// The same trick for Update statement
stmt.executeUpdate(sqlViewStr);
}
catch (SQLException ex) {
isError = true;
}
finally {
try {
if (stmt != null) {
stmt.close();
}
if (connection != null) {
if (isError) {
connection.rollback();
}
else {
connection.commit();
}
connection.close();
}
}
catch (SQLException ex1) { isError = true; }
}
FinishJobs(isError);
return;
}
private StringBuffer CreateWhereClause() {
StringBuffer clause = new StringBuffer("(");
for (int i = 0; i < mJobs.size(); i++) {
Job job = (Job) mJobs.get(i);
String id = job.mID;
if (i != 0) {
clause.append(",");
}
clause.append(id);
mJobMap.put(id, job);
}
clause.append(")");
return clause;
}
private void FinishJobs(boolean isError) {
for (int i = 0; i < mJobs.size(); i++) {
Job job = (Job) mJobs.get(i);
if (isError) {
job.mHasFailed=true; //Rudimentary error handling
}
job.mJobThread.interrupt();
/* Wake up the TrainServlet to deliver the page to the browser */
}
}
public void run() {
Process();
}
}
Worker类创建SQL语句,为一次与数据库的交互来组合Job,用来自数据库的信息生成Job实例并且唤醒Job线程。
