本文共 7279 字,大约阅读时间需要 24 分钟。
一. 简述
URI调度,简单的来说就是提供一个分配URI和加入URI的方法,抓取线程通过分配URI获取待抓取URI,抓取分析完成后需要把希望继续抓取的URI加入到调度器内,等待调度。Heritrix的CrawlController是通过定义一个
private transient Frontier frontier
来实现调度器的管理的,Heritrix提供了若干个调度器的实现,当然也可以根据自己的实际需要改写或完全重新定义自己的调度器,可以通过order.xml定义frontier为自定义的实现类。默认的实现类是BdbFrontier,一个基于BDB持久化的调度器实现,以下是其配置例子
4.0 20000 2000 300 30 900 1 0 0 org.archive.crawler.frontier.HostnameQueueAssignmentPolicy false false false true true 3000 100 -1 org.archive.crawler.frontier.ZeroCostAssignmentPolicy 300000 50 org.archive.crawler.util.BdbUriUniqFilter false
这些配置属性在稍后的代码分析中可以看到是怎样使用的。
二. 接口定义
这里先解释一下主要的几个方法:
initialize :调度器初始化入口
next :由抓取线程调用该方法以 获取待抓取uri
schedule :由抓取线程调用该方法以将指定需要抓取的uri加入调度器
finished : 由抓取线程调用该方法以处理uri抓取结果
loadSeeds : 加载种子
start :开始工作
三. 主要的成员变量分析(BdbFrontier)
1. protected transient UriUniqFilter alreadyIncluded
protected transient UriUniqFilter alreadyIncluded;由WorkQueueFrontier定义:protected abstract UriUniqFilter createAlreadyIncluded() throws IOExceptionBdbFrontier实现: /** * Create a UriUniqFilter that will serve as record * of already seen URIs. * * @return A UURISet that will serve as a record of already seen URIs * @throws IOException */ protected UriUniqFilter createAlreadyIncluded() throws IOException { UriUniqFilter uuf; String c = null; try { c = (String)getAttribute(null, ATTR_INCLUDED); } catch (AttributeNotFoundException e) { // Do default action if attribute not in order. } // TODO: avoid all this special-casing; enable some common // constructor interface usable for all alt implemenations if (c != null && c.equals(BloomUriUniqFilter.class.getName())) { uuf = this.controller.isCheckpointRecover()? deserializeAlreadySeen(BloomUriUniqFilter.class, this.controller.getCheckpointRecover().getDirectory()): new BloomUriUniqFilter(); } else if (c!=null && c.equals(MemFPMergeUriUniqFilter.class.getName())) { // TODO: add checkpointing for MemFPMergeUriUniqFilter uuf = new MemFPMergeUriUniqFilter(); } else if (c!=null && c.equals(DiskFPMergeUriUniqFilter.class.getName())) { // TODO: add checkpointing for DiskFPMergeUriUniqFilter uuf = new DiskFPMergeUriUniqFilter(controller.getScratchDisk()); } else { // Assume its BdbUriUniqFilter. uuf = this.controller.isCheckpointRecover()? deserializeAlreadySeen(BdbUriUniqFilter.class, this.controller.getCheckpointRecover().getDirectory()): new BdbUriUniqFilter(this.controller.getBdbEnvironment()); if (this.controller.isCheckpointRecover()) { // If recover, need to call reopen of the db. try { ((BdbUriUniqFilter)uuf). reopen(this.controller.getBdbEnvironment()); } catch (DatabaseException e) { throw new IOException(e.getMessage()); } } } uuf.setDestination(this); return uuf; }默认使用BdbUriUniqFilter实例化
BdbUriUniqFilter使用bdb数据库进行url去重,key为url的指纹,比较简单,就不惜将了。
2. protected transient ObjectIdentityCache<String,WorkQueue> allQueues
该成员保持所有的workQueue,默认情况使用ObjectIdentityBdbCache实现,一个使用BDB持久化的大容量对象缓存实现,类似于Map,个人觉得这个类是比较经典的单节点对象缓存实现类,代码写的也比较有意思,其中使用了Java的四种引用。大家有兴趣可以自己看看。
该实例的key由public String getClassKey(CandidateURI cauri)方法生成,每个url对应着一个class key,一般有hostname,ip的hashcode等,具体由QueueAssignmentPolicy抽象类定义,如果想要实现自己的队列分配策略,可以继承该类实现。
/** * @param cauri CrawlURI we're to get a key for. * @return a String token representing a queue */ public String getClassKey(CandidateURI cauri) { String queueKey = (String)getUncheckedAttribute(cauri, ATTR_FORCE_QUEUE); if ("".equals(queueKey)) { // no forced override QueueAssignmentPolicy queueAssignmentPolicy = getQueueAssignmentPolicy(cauri); queueKey = queueAssignmentPolicy.getClassKey(this.controller, cauri); } return queueKey; } protected QueueAssignmentPolicy getQueueAssignmentPolicy(CandidateURI cauri) { String clsName = (String)getUncheckedAttribute(cauri, ATTR_QUEUE_ASSIGNMENT_POLICY); try { return (QueueAssignmentPolicy) Class.forName(clsName).newInstance(); } catch (Exception e) { throw new RuntimeException(e); } }配置: org.archive.crawler.frontier.HostnameQueueAssignmentPolicy
3. protected BlockingQueue<String> readyClassQueues
存放队列的第一项准备好了等待分配出去的队列的class key,在toethread调用next()方法的时候,会尝试从该队列取出第一个class key,然后再到allQueues取出对应的WorkQueue,然后把WorkQueue的第一项CrawlURI返回给toethread进行抓取。
4. protected int targetSizeForReadyQueues;
Target (minimum) size to keep readyClassQueues
5. protected transient Semaphore readyFiller = new Semaphore(1)
单线程信号量,在next()方法尝试把不活跃的队列加入到readyClassQueues时用到
6. protected Queue<String> inactiveQueues
类似readyClassQueues,这里存放的是不活跃的工作队列的class key
7. protected Queue<String> retiredQueues
需要重试的工作队列的class key。
'retired' queues, no longer considered for activation
8. protected Bag inProcessQueues = BagUtils.synchronizedBag(new HashBag());
已经被分配了但是还未完成的工作队列的class key,可以看成一个HashSet
9. protected SortedSet<WorkQueue> snoozedClassQueues;
All per-class queues held in snoozed state, sorted by wake time,可以理解成处于休眠状态的工作队列,等待唤醒时间排序,即多久后唤醒某一队列
相关队列初始化:
/** * Set up the various queues-of-queues used by the frontier. Override * in implementing subclasses to reduce or eliminate risk of queues * growing without bound. */ protected void initQueuesOfQueues() { // small risk of OutOfMemoryError: if 'hold-queues' is false, // readyClassQueues may grow in size without bound readyClassQueues = new LinkedBlockingQueue (); // risk of OutOfMemoryError: in large crawls, // inactiveQueues may grow in size without bound inactiveQueues = new LinkedBlockingQueue (); // risk of OutOfMemoryError: in large crawls with queue max-budgets, // inactiveQueues may grow in size without bound retiredQueues = new LinkedBlockingQueue (); // small risk of OutOfMemoryError: in large crawls with many // unresponsive queues, an unbounded number of snoozed queues // may exist snoozedClassQueues = Collections.synchronizedSortedSet(new TreeSet ()); }
四. 主要方法及其流程分析
调度时序图:
schedule流程图:
next流程图
finished(CrawlURI curi) 流程图
参考:
转载地址:https://blog.csdn.net/wliufu/article/details/84433280 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!