Heritrix源码分析之URI调度详解
发布日期:2021-09-16 04:36:31 浏览次数:10 分类:技术文章

本文共 7279 字,大约阅读时间需要 24 分钟。

一. 简述

URI调度,简单的来说就是提供一个分配URI和加入URI的方法,抓取线程通过分配URI获取待抓取URI,抓取分析完成后需要把希望继续抓取的URI加入到调度器内,等待调度。Heritrix的CrawlController是通过定义一个

private transient Frontier frontier

 来实现调度器的管理的,Heritrix提供了若干个调度器的实现,当然也可以根据自己的实际需要改写或完全重新定义自己的调度器,可以通过order.xml定义frontier为自定义的实现类。默认的实现类是BdbFrontier,一个基于BDB持久化的调度器实现,以下是其配置例子

4.0
20000
2000
300
30
900
1
0
0
org.archive.crawler.frontier.HostnameQueueAssignmentPolicy
false
false
false
true
true
3000
100
-1
org.archive.crawler.frontier.ZeroCostAssignmentPolicy
300000
50
org.archive.crawler.util.BdbUriUniqFilter
false

 这些配置属性在稍后的代码分析中可以看到是怎样使用的。

二. 接口定义

 

这里先解释一下主要的几个方法:

initialize :调度器初始化入口

next :由抓取线程调用该方法以 获取待抓取uri

schedule :由抓取线程调用该方法以将指定需要抓取的uri加入调度器

finished : 由抓取线程调用该方法以处理uri抓取结果

loadSeeds : 加载种子

start :开始工作

三. 主要的成员变量分析(BdbFrontier)

1. protected transient UriUniqFilter alreadyIncluded

 

protected transient UriUniqFilter alreadyIncluded;由WorkQueueFrontier定义:protected abstract UriUniqFilter createAlreadyIncluded() throws IOExceptionBdbFrontier实现:    /**     * Create a UriUniqFilter that will serve as record      * of already seen URIs.     *     * @return A UURISet that will serve as a record of already seen URIs     * @throws IOException     */    protected UriUniqFilter createAlreadyIncluded() throws IOException {        UriUniqFilter uuf;        String c = null;        try {            c = (String)getAttribute(null, ATTR_INCLUDED);        } catch (AttributeNotFoundException e) {            // Do default action if attribute not in order.        }        // TODO: avoid all this special-casing; enable some common        // constructor interface usable for all alt implemenations        if (c != null && c.equals(BloomUriUniqFilter.class.getName())) {            uuf = this.controller.isCheckpointRecover()?                    deserializeAlreadySeen(BloomUriUniqFilter.class,                        this.controller.getCheckpointRecover().getDirectory()):                    new BloomUriUniqFilter();        } else if (c!=null && c.equals(MemFPMergeUriUniqFilter.class.getName())) {            // TODO: add checkpointing for MemFPMergeUriUniqFilter            uuf = new MemFPMergeUriUniqFilter();        } else if (c!=null && c.equals(DiskFPMergeUriUniqFilter.class.getName())) {            // TODO: add checkpointing for DiskFPMergeUriUniqFilter            uuf = new DiskFPMergeUriUniqFilter(controller.getScratchDisk());        } else {            // Assume its BdbUriUniqFilter.            uuf = this.controller.isCheckpointRecover()?                deserializeAlreadySeen(BdbUriUniqFilter.class,                    this.controller.getCheckpointRecover().getDirectory()):                new BdbUriUniqFilter(this.controller.getBdbEnvironment());            if (this.controller.isCheckpointRecover()) {                // If recover, need to call reopen of the db.                try {                    ((BdbUriUniqFilter)uuf).                        reopen(this.controller.getBdbEnvironment());                } catch (DatabaseException e) {                    throw new IOException(e.getMessage());                }            }           }        uuf.setDestination(this);        return uuf;    }默认使用BdbUriUniqFilter实例化
 BdbUriUniqFilter使用bdb数据库进行url去重,key为url的指纹,比较简单,就不惜将了。

2. protected transient ObjectIdentityCache<String,WorkQueue> allQueues

该成员保持所有的workQueue,默认情况使用ObjectIdentityBdbCache实现,一个使用BDB持久化的大容量对象缓存实现,类似于Map,个人觉得这个类是比较经典的单节点对象缓存实现类,代码写的也比较有意思,其中使用了Java的四种引用。大家有兴趣可以自己看看。

该实例的key由public String getClassKey(CandidateURI cauri)方法生成,每个url对应着一个class key,一般有hostname,ip的hashcode等,具体由QueueAssignmentPolicy抽象类定义,如果想要实现自己的队列分配策略,可以继承该类实现。

/**     * @param cauri CrawlURI we're to get a key for.     * @return a String token representing a queue     */    public String getClassKey(CandidateURI cauri) {        String queueKey = (String)getUncheckedAttribute(cauri,            ATTR_FORCE_QUEUE);        if ("".equals(queueKey)) {            // no forced override            QueueAssignmentPolicy queueAssignmentPolicy =                 getQueueAssignmentPolicy(cauri);            queueKey =                queueAssignmentPolicy.getClassKey(this.controller, cauri);        }        return queueKey;    }    protected QueueAssignmentPolicy getQueueAssignmentPolicy(CandidateURI cauri) {        String clsName = (String)getUncheckedAttribute(cauri,                ATTR_QUEUE_ASSIGNMENT_POLICY);        try {            return (QueueAssignmentPolicy) Class.forName(clsName).newInstance();        } catch (Exception e) {            throw new RuntimeException(e);        }    }配置:
org.archive.crawler.frontier.HostnameQueueAssignmentPolicy

 

3. protected BlockingQueue<String> readyClassQueues

存放队列的第一项准备好了等待分配出去的队列的class key,在toethread调用next()方法的时候,会尝试从该队列取出第一个class key,然后再到allQueues取出对应的WorkQueue,然后把WorkQueue的第一项CrawlURI返回给toethread进行抓取。

4. protected int targetSizeForReadyQueues;

 Target (minimum) size to keep readyClassQueues

5. protected transient Semaphore readyFiller = new Semaphore(1)

单线程信号量,在next()方法尝试把不活跃的队列加入到readyClassQueues时用到

6. protected Queue<String> inactiveQueues

类似readyClassQueues,这里存放的是不活跃的工作队列的class key

7. protected Queue<String> retiredQueues

需要重试的工作队列的class key。

'retired' queues, no longer considered for activation

8. protected Bag inProcessQueues = BagUtils.synchronizedBag(new HashBag());

已经被分配了但是还未完成的工作队列的class key,可以看成一个HashSet

9. protected SortedSet<WorkQueue> snoozedClassQueues;

All per-class queues held in snoozed state, sorted by wake time,可以理解成处于休眠状态的工作队列,等待唤醒时间排序,即多久后唤醒某一队列

相关队列初始化:

/**     * Set up the various queues-of-queues used by the frontier. Override     * in implementing subclasses to reduce or eliminate risk of queues     * growing without bound.      */    protected void initQueuesOfQueues() {        // small risk of OutOfMemoryError: if 'hold-queues' is false,        // readyClassQueues may grow in size without bound        readyClassQueues = new LinkedBlockingQueue
(); // risk of OutOfMemoryError: in large crawls, // inactiveQueues may grow in size without bound inactiveQueues = new LinkedBlockingQueue
(); // risk of OutOfMemoryError: in large crawls with queue max-budgets, // inactiveQueues may grow in size without bound retiredQueues = new LinkedBlockingQueue
(); // small risk of OutOfMemoryError: in large crawls with many // unresponsive queues, an unbounded number of snoozed queues // may exist snoozedClassQueues = Collections.synchronizedSortedSet(new TreeSet
()); }

 

四. 主要方法及其流程分析

 调度时序图:

schedule流程图:

next流程图

finished(CrawlURI curi) 流程图

 

参考:

 

转载地址:https://blog.csdn.net/wliufu/article/details/84433280 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Heritrix源码分析之URI调度详解
下一篇:cdh3u4 编译eclipse-plugin(转载)

发表评论

最新留言

做的很好,不错不错
[***.243.131.199]2024年04月01日 16时58分42秒