Heritrix源码分析之URI调度详解
发布日期:2021-09-16 04:36:31 浏览次数:14 分类:技术文章

本文共 8488 字,大约阅读时间需要 28 分钟。

一. 简述

URI调度,简单的来说就是提供一个分配URI和加入URI的方法,抓取线程通过分配URI获取待抓取URI,抓取分析完成后需要把希望继续抓取的URI加入到调度器内,等待调度。Heritrix的CrawlController是通过定义一个

Java代码  
  1. private transient Frontier frontier  

 来实现调度器的管理的,Heritrix提供了若干个调度器的实现,当然也可以根据自己的实际需要改写或完全重新定义自己的调度器,可以通过 order.xml定义frontier为自定义的实现类。默认的实现类是BdbFrontier,一个基于BDB持久化的调度器实现,以下是其配置例子

Xml代码  
  1. <newObject name="frontier" class="org.archive.crawler.frontier.BdbFrontier"><!-- Frontier 调度器-->  
  2.       <float name="delay-factor">4.0</float><!-- 从同一个服务器(host)获取需要等待的间隔时间,可以预防无节制的抓取一个网站.通常是用该值去乘以上一个url的抓取时间来表示为下一个url需要等待的时间 -->  
  3.       <integer name="max-delay-ms">20000</integer><!-- 最大的等待时间,单位毫秒 -->  
  4.       <integer name="min-delay-ms">2000</integer><!--  最小等待时间,单位毫秒-->  
  5.       <integer name="respect-crawl-delay-up-to-secs">300</integer><!--当读取robots.txt时推迟抓取的时间,单位毫秒 -->  
  6.       <integer name="max-retries">30</integer><!-- 已经尝试失败的URI的重新尝试次数,很多人在跑Heritrix的时候,发现只跑了30个URL就停止了,其实是一个URL都没成功,它这里重试了30次 -->  
  7.       <long name="retry-delay-seconds">900</long><!--默认多长时间我们重新去抓取一个检索失败的URI -->  
  8.       <integer name="preference-embed-hops">1</integer><!--嵌入或者重定向URI调度等级,例如,该值为1(默认也为1),调度时将比普通的link等级高.如果设置为0,则和link一样 -->  
  9.       <integer name="total-bandwidth-usage-KB-sec">0</integer><!--爬虫所允许的最大宽带平均数,实际的读取速度是不受此影响的,当爬虫使用的宽带接近极限时,它会阻碍新的URI去处理,0表示没有限制 -->  
  10.       <integer name="max-per-host-bandwidth-usage-KB-sec">0</integer><!--爬虫允许的每个域名所使用的最大宽带数,实际的读取速度不会受此影响,当爬虫使用的宽带接近极限时,它会阻碍新的URI去处理,0表示没有限制 -->  
  11.       <string name="queue-assignment-policy">org.archive.crawler.frontier.HostnameQueueAssignmentPolicy</string><!--定义如何去分配URI到各个队列,这个类是相同的host的url就属于同一个队列 -->  
  12.       <string name="force-queue-assignment"></string><!--强制URI的队列名字, -->  
  13.       <boolean name="pause-at-start">false</boolean><!-- 在URI被尝试前,当爬虫启动后是否暂停?这个操作可以在爬虫工作前核实或调整爬虫。默认为false -->  
  14.       <boolean name="pause-at-finish">false</boolean><!-- 当爬虫结束时是否暂停,而不是立刻停止工作.这个操作可以在爬虫状态还是可用时,有机会去显示爬虫结果,并有可能去增加URI和调整setting,默认为false-->  
  15.       <boolean name="source-tag-seeds">false</boolean><!-- 是否去标记通过种子抓取的uri作为种子的遗传,用source值代替.-->  
  16.       <boolean name="recovery-log-enabled">true</boolean><!--设置为false表示禁用恢复日志写操作,为true时候表示你用checkpoint去恢复crawl销毁的数据 -->  
  17.       <boolean name="hold-queues">true</boolean><!-- 当队列数量未达到时,是否不让其运行,达到了才运行。是否要去持久化一个创建的每个域名一个的URI工作队列直到他们需要一直繁忙(开始工作)。如果为 false(默认值),队列会在任何时间提供URI去抓取。如果为true,则队列一开始(还有收集的url)会处于不在活动中的状态,只有在 Frontier需要另外一个队列使得所有线程繁忙的时候才会让一个新的队列出于活动状态. -->  
  18.       <integer name="balance-replenish-amount">3000</integer><!--补充一定的数量去使得队列平衡,更大的数目则意味着更多的URI将在它们处于等待队列停用之前将被尝试 -->  
  19.       <integer name="error-penalty-amount">100</integer><!-- 当队列中的一个URI处理失败时,需要另外处罚的数量.加速失活或问题队列,反应迟钝的网站完全退休。,默认为100-->  
  20.       <long name="queue-total-budget">-1</long><!--单个队列所允许的活动的开支,队列超出部分将被重试或者不再抓取,默认为-1,则表示没有这个限制 -->  
  21.       <string name="cost-policy">org.archive.crawler.frontier.ZeroCostAssignmentPolicy</string><!-- 用于计算每个URI成本,默认为UnitCostAssignmentPolicy则认为每个URI的成本为1-->  
  22.       <long name="snooze-deactivate-ms">300000</long><!--任何snooze延迟都会影响队列不活动,允许其他队列有机会进入活动状态,通常设置为比在成功获取时暂停时间长,比连接失败短,默认为5分钟 -->  
  23.       <integer name="target-ready-backlog">50</integer><!--准备积压队列的目标大小,这里多个队列将会进入准备状态即使线程不再等待.只有hold-queues为true才有效,默认为50 -->  
  24.       <string name="uri-included-structure">org.archive.crawler.util.BdbUriUniqFilter</string><!-- -->  
  25.       <boolean name="dump-pending-at-close">false</boolean><!-- -->  
  26.     </newObject>  

 这些配置属性在稍后的代码分析中可以看到是怎样使用的。

二. 接口定义

 

这里先解释一下主要的几个方法:

initialize :调度器初始化入口

next :由抓取线程调用该方法以 获取待抓取uri

schedule :由抓取线程调用该方法以将指定需要抓取的uri加入调度器

finished : 由抓取线程调用该方法以处理uri抓取结果

loadSeeds : 加载种子

start :开始工作

三. 主要的成员变量分析(BdbFrontier)

1. protected transient UriUniqFilter alreadyIncluded

 

Java代码  
  1. protected transient UriUniqFilter alreadyIncluded;  
  2. 由WorkQueueFrontier定义:  
  3. protected abstract UriUniqFilter createAlreadyIncluded() throws IOException  
  4. BdbFrontier实现:  
  5.   
  6.     /** 
  7.      * Create a UriUniqFilter that will serve as record  
  8.      * of already seen URIs. 
  9.      * 
  10.      * @return A UURISet that will serve as a record of already seen URIs 
  11.      * @throws IOException 
  12.      */  
  13.     protected UriUniqFilter createAlreadyIncluded() throws IOException {  
  14.         UriUniqFilter uuf;  
  15.         String c = null;  
  16.         try {  
  17.             c = (String)getAttribute(null, ATTR_INCLUDED);  
  18.         } catch (AttributeNotFoundException e) {  
  19.             // Do default action if attribute not in order.  
  20.         }  
  21.         // TODO: avoid all this special-casing; enable some common  
  22.         // constructor interface usable for all alt implemenations  
  23.         if (c != null && c.equals(BloomUriUniqFilter.class.getName())) {  
  24.             uuf = this.controller.isCheckpointRecover()?  
  25.                     deserializeAlreadySeen(BloomUriUniqFilter.class,  
  26.                         this.controller.getCheckpointRecover().getDirectory()):  
  27.                     new BloomUriUniqFilter();  
  28.         } else if (c!=null && c.equals(MemFPMergeUriUniqFilter.class.getName())) {  
  29.             // TODO: add checkpointing for MemFPMergeUriUniqFilter  
  30.             uuf = new MemFPMergeUriUniqFilter();  
  31.         } else if (c!=null && c.equals(DiskFPMergeUriUniqFilter.class.getName())) {  
  32.             // TODO: add checkpointing for DiskFPMergeUriUniqFilter  
  33.             uuf = new DiskFPMergeUriUniqFilter(controller.getScratchDisk());  
  34.         } else {  
  35.             // Assume its BdbUriUniqFilter.  
  36.             uuf = this.controller.isCheckpointRecover()?  
  37.                 deserializeAlreadySeen(BdbUriUniqFilter.class,  
  38.                     this.controller.getCheckpointRecover().getDirectory()):  
  39.                 new BdbUriUniqFilter(this.controller.getBdbEnvironment());  
  40.             if (this.controller.isCheckpointRecover()) {  
  41.                 // If recover, need to call reopen of the db.  
  42.                 try {  
  43.                     ((BdbUriUniqFilter)uuf).  
  44.                         reopen(this.controller.getBdbEnvironment());  
  45.                 } catch (DatabaseException e) {  
  46.                     throw new IOException(e.getMessage());  
  47.                 }  
  48.             }     
  49.         }  
  50.         uuf.setDestination(this);  
  51.         return uuf;  
  52.     }  
  53.   
  54. 默认使用BdbUriUniqFilter实例化  
 BdbUriUniqFilter使用bdb数据库进行url去重,key为url的指纹,比较简单,就不细讲了。

2. protected transient ObjectIdentityCache<String,WorkQueue> allQueues

该成员保持所有的workQueue,默认情况使用ObjectIdentityBdbCache实现,一个使用BDB持久化的大容量对象缓存实 现,类似于Map,个人觉得这个类是比较经典的单节点对象缓存实现类,代码写的也比较有意思,其中使用了Java的四种引用。大家有兴趣可以自己看看。

该实例的key由public String getClassKey(CandidateURI cauri)方法生成,每个url对应着一个class key,一般有hostname,ip的hashcode等,具体由QueueAssignmentPolicy抽象类定义,如果想要实现自己的队列分配 策略,可以继承该类实现。

Java代码  
  1.     /** 
  2.      * @param cauri CrawlURI we're to get a key for. 
  3.      * @return a String token representing a queue 
  4.      */  
  5.     public String getClassKey(CandidateURI cauri) {  
  6.         String queueKey = (String)getUncheckedAttribute(cauri,  
  7.             ATTR_FORCE_QUEUE);  
  8.         if ("".equals(queueKey)) {  
  9.             // no forced override  
  10.             QueueAssignmentPolicy queueAssignmentPolicy =   
  11.                 getQueueAssignmentPolicy(cauri);  
  12.             queueKey =  
  13.                 queueAssignmentPolicy.getClassKey(this.controller, cauri);  
  14.         }  
  15.         return queueKey;  
  16.     }  
  17.     protected QueueAssignmentPolicy getQueueAssignmentPolicy(CandidateURI cauri) {  
  18.         String clsName = (String)getUncheckedAttribute(cauri,  
  19.                 ATTR_QUEUE_ASSIGNMENT_POLICY);  
  20.         try {  
  21.             return (QueueAssignmentPolicy) Class.forName(clsName).newInstance();  
  22.         } catch (Exception e) {  
  23.             throw new RuntimeException(e);  
  24.         }  
  25.     }  
  26. 配置:  
  27. <string name="queue-assignment-policy">org.archive.crawler.frontier.HostnameQueueAssignmentPolicy</string><!-- 定义如何去分配URI到各个队列,这个类是相同的host的url就属于同一个队列 -->  

 

3. protected BlockingQueue<String> readyClassQueues

存放队列的第一项准备好了等待分配出去的队列的class key,在toethread调用next()方法的时候,会尝试从该队列取出第一个class key,然后再到allQueues取出对应的WorkQueue,然后把WorkQueue的第一项CrawlURI返回给toethread进行抓 取。

4. protected int targetSizeForReadyQueues;

 Target (minimum) size to keep readyClassQueues

5. protected transient Semaphore readyFiller = new Semaphore(1)

单线程信号量,在next()方法尝试把不活跃的队列加入到readyClassQueues时用到

6. protected Queue<String> inactiveQueues

类似readyClassQueues,这里存放的是不活跃的工作队列的class key

7. protected Queue<String> retiredQueues

需要重试的工作队列的class key。

'retired' queues, no longer considered for activation

8. protected Bag inProcessQueues = BagUtils.synchronizedBag(new HashBag());

已经被分配了但是还未完成的工作队列的class key,可以看成一个HashSet

9. protected SortedSet<WorkQueue> snoozedClassQueues;

All per-class queues held in snoozed state, sorted by wake time,可以理解成处于休眠状态的工作队列,等待唤醒时间排序,即多久后唤醒某一队列

相关队列初始化:

Java代码  
  1. /** 
  2.  * Set up the various queues-of-queues used by the frontier. Override 
  3.  * in implementing subclasses to reduce or eliminate risk of queues 
  4.  * growing without bound.  
  5.  */  
  6. protected void initQueuesOfQueues() {  
  7.     // small risk of OutOfMemoryError: if 'hold-queues' is false,  
  8.     // readyClassQueues may grow in size without bound  
  9.     readyClassQueues = new LinkedBlockingQueue<String>();  
  10.     // risk of OutOfMemoryError: in large crawls,   
  11.     // inactiveQueues may grow in size without bound  
  12.     inactiveQueues = new LinkedBlockingQueue<String>();  
  13.     // risk of OutOfMemoryError: in large crawls with queue max-budgets,   
  14.     // inactiveQueues may grow in size without bound  
  15.     retiredQueues = new LinkedBlockingQueue<String>();  
  16.     // small risk of OutOfMemoryError: in large crawls with many   
  17.     // unresponsive queues, an unbounded number of snoozed queues   
  18.     // may exist  
  19.     snoozedClassQueues = Collections.synchronizedSortedSet(new TreeSet<WorkQueue>());  
  20. }  

 

四. 主要方法及其流程分析

 调度时序图:

schedule流程图:

next流程图

finished(CrawlURI curi) 流程图

 

参考:

 

转载地址:https://blog.csdn.net/wliufu/article/details/84433466 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:常用资源地址集
下一篇:Heritrix源码分析之URI调度详解

发表评论

最新留言

表示我来过!
[***.240.166.169]2024年03月02日 00时52分11秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

带bitlocker解密的pe_如何在PE下解锁bitlocker 2019-04-21
lj245a引脚功能图_谁找到74254,74LS245芯片引脚的功能和功能图啊? 2019-04-21
sts 创建webservice项目_通过eclipse将Java生成webservice | 学步园 2019-04-21
python数字字符串和数字相加_数字和字符串 2019-04-21
python风控模型举例_一文搞定风控模型6大核心指标(附代码) 2019-04-21
java arraylist 写入文件_java-将自定义对象的ArrayList写入文件 2019-04-21
ice glacier2 java_ICE提纲之demo/Glacier2/callback(跨网回调) 2019-04-21
java 转发上传文件_java 后台请求其他接口转发文件 2019-04-21
Java get set 同步_java – getResultSet()“每个结果只能调用一次” 2019-04-21
java jmx 配置_为什么在配置JMX时Java打开3个端口? 2019-04-21
java thread回调_使用Runnable在Java中实现回调 2019-04-21
java 内存区_Java内存模型和Java内存区域的区别和联系? 2019-04-21
java定时任务监控_Spring定时任务使用及如何使用邮件监控服务器 2019-04-21
java crc32 使用_Java CRC32的用法 2019-04-21
java读取unicode_java怎么样将unicode解码读取?Java读取本地文件进 2019-04-21
java.io.file()_Java File getUsableSpace()方法 2019-04-21
java httpclient 工具_spring整合httpClient工具类 2019-04-21
java监控其他服务器运行状态_windows服务器监控多个tomcat运行状态 2019-04-21
java给学生按总成绩排名_java - 输入学生成绩,取它们的平均值,然后通过排名等级的学生 - SO中文参考 - www.soinside.com... 2019-04-21
java构造函数有什么用_java构造函数有什么用,怎么用 2019-04-21