View Javadoc

1   /* UriUniqFilterImpl
2   *
3   * $Id: FPMergeUriUniqFilter.java 4647 2006-09-22 18:39:39Z paul_jack $
4   *
5   * Created on Sep 29, 2005
6   *
7   * Copyright (C) 2005 Internet Archive.
8   *
9   * This file is part of the Heritrix web crawler (crawler.archive.org).
10  *
11  * Heritrix is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser Public License as published by
13  * the Free Software Foundation; either version 2.1 of the License, or
14  * any later version.
15  *
16  * Heritrix is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU Lesser Public License for more details.
20  *
21  * You should have received a copy of the GNU Lesser Public License
22  * along with Heritrix; if not, write to the Free Software
23  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24  */ 
25  package org.archive.crawler.util;
26  
27  import it.unimi.dsi.fastutil.longs.LongIterator;
28  
29  import java.io.BufferedOutputStream;
30  import java.io.File;
31  import java.io.FileNotFoundException;
32  import java.io.FileOutputStream;
33  import java.io.PrintWriter;
34  import java.util.Iterator;
35  import java.util.TreeSet;
36  import java.util.logging.Level;
37  import java.util.logging.Logger;
38  
39  import org.archive.crawler.datamodel.CandidateURI;
40  import org.archive.crawler.datamodel.UriUniqFilter;
41  import org.archive.util.fingerprint.ArrayLongFPCache;
42  
43  import st.ata.util.FPGenerator;
44  
45  /***
46   * UriUniqFilter based on merging FP arrays (in memory or from disk). 
47   * 
48   * Inspired by the approach in Najork and Heydon, "High-Performance
49   * Web Crawling" (2001), section 3.2, "Efficient Duplicate URL 
50   * Eliminators". 
51   * 
52   * @author gojomo
53   */
54  public abstract class FPMergeUriUniqFilter implements UriUniqFilter {
55      /***
56       * Represents a long fingerprint and (possibly) its corresponding
57       * CandidateURI, awaiting the next merge in a 'pending' state. 
58       */
59      public class PendingItem implements Comparable {
60          long fp;
61          CandidateURI caUri;
62          public PendingItem(long fp, CandidateURI value) {
63              this.fp = fp;
64              this.caUri = value;
65          }
66          public int compareTo(Object arg0) {
67              PendingItem vs = (PendingItem) arg0;
68              return (fp < vs.fp) ? -1 : ( (fp == vs.fp) ? 0 : 1); 
69          }
70      }
71      
72      private static Logger LOGGER =
73          Logger.getLogger(FPMergeUriUniqFilter.class.getName());
74  
75      protected HasUriReceiver receiver;
76      protected PrintWriter profileLog;
77      
78      // statistics
79      protected long quickDuplicateCount = 0;
80      protected long quickDupAtLast = 0; 
81      protected long pendDuplicateCount = 0;
82      protected long pendDupAtLast = 0; 
83      protected long mergeDuplicateCount = 0;
84      protected long mergeDupAtLast = 0; 
85      
86      /*** items awaiting merge
87       * TODO: consider only sorting just pre-merge
88       * TODO: consider using a fastutil long->Object class
89       * TODO: consider actually writing items to disk file,
90       * as in Najork/Heydon
91       */
92      protected TreeSet<PendingItem> pendingSet = new TreeSet<PendingItem>();
93      
94      /*** size at which to force flush of pending items */
95      protected int maxPending = DEFAULT_MAX_PENDING;
96      public static final int DEFAULT_MAX_PENDING = 10000; 
97      // TODO: increase
98      
99      /***
100      * time-based throttle on flush-merge operations
101      */
102     protected long nextFlushAllowableAfter = 0;
103     public static final long FLUSH_DELAY_FACTOR = 100;
104 
105     /*** cache of most recently seen FPs */
106     protected ArrayLongFPCache quickCache = new ArrayLongFPCache();
107     // TODO: make cache most-often seen, not just most-recent
108     
109     public FPMergeUriUniqFilter() {
110         super();
111         String profileLogFile = 
112             System.getProperty(FPMergeUriUniqFilter.class.getName()
113                 + ".profileLogFile");
114         if (profileLogFile != null) {
115             setProfileLog(new File(profileLogFile));
116         }
117     }
118 
119     public void setMaxPending(int max) {
120         maxPending = max;
121     }
122     
123     public long pending() {
124         return pendingSet.size();
125     }
126 
127     public void setDestination(HasUriReceiver receiver) {
128         this.receiver = receiver;
129     }
130 
131     protected void profileLog(String key) {
132         if (profileLog != null) {
133             profileLog.println(key);
134         }
135     }
136     
137     /* (non-Javadoc)
138      * @see org.archive.crawler.datamodel.UriUniqFilter#add(java.lang.String, org.archive.crawler.datamodel.CandidateURI)
139      */
140     public synchronized void add(String key, CandidateURI value) {
141         profileLog(key);
142         long fp = createFp(key); 
143         if(! quickCheck(fp)) {
144             quickDuplicateCount++;
145             return; 
146         }
147         pend(fp,value);
148         if (pendingSet.size()>=maxPending) {
149             flush();
150         }
151     }
152 
153     /***
154      * Place the given FP/CandidateURI pair into the pending set, awaiting
155      * a merge to determine if it's actually accepted. 
156      * 
157      * @param fp long fingerprint
158      * @param value CandidateURI or null, if fp only needs merging (as when 
159      * CandidateURI was already forced in
160      */
161     protected void pend(long fp, CandidateURI value) {
162         // special case for first batch of adds
163         if(count()==0) {
164             if(pendingSet.add(new PendingItem(fp,null))==false) {
165                 pendDuplicateCount++; // was already present
166             } else {
167                 // since there's no prior list to merge, push uri along right now
168                 if(value!=null) {
169                     this.receiver.receive(value);
170                 }
171             }
172             return;
173         }
174         if(pendingSet.add(new PendingItem(fp,value))==false) {
175             pendDuplicateCount++; // was already present
176         }
177     }
178 
179     /***
180      * Evaluate if quick-check cache considers fingerprint novel enough
181      * for further consideration. 
182      * 
183      * @param fp long fingerprint to check
184      * @return true if fp deserves consideration; false if it appears in cache
185      */
186     private boolean quickCheck(long fp) {
187         return quickCache.add(fp);
188     }
189 
190     /***
191      * Create a fingerprint from the given key
192      * 
193      * @param key CharSequence (URI) to fingerprint
194      * @return long fingerprint
195      */
196     public static long createFp(CharSequence key) {
197         return FPGenerator.std64.fp(key);
198     }
199 
200 
201     /* (non-Javadoc)
202      * @see org.archive.crawler.datamodel.UriUniqFilter#addNow(java.lang.String, org.archive.crawler.datamodel.CandidateURI)
203      */
204     public void addNow(String key, CandidateURI value) {
205         add(key, value);
206         flush();
207     }
208     
209     /* (non-Javadoc)
210      * @see org.archive.crawler.datamodel.UriUniqFilter#addForce(java.lang.String, org.archive.crawler.datamodel.CandidateURI)
211      */
212     public void addForce(String key, CandidateURI value) {
213         add(key,null); // dummy pend
214         this.receiver.receive(value);
215     }
216 
217     /* (non-Javadoc)
218      * @see org.archive.crawler.datamodel.UriUniqFilter#note(java.lang.String)
219      */
220     public void note(String key) {
221         add(key,null);
222     }
223 
224     /* (non-Javadoc)
225      * @see org.archive.crawler.datamodel.UriUniqFilter#forget(java.lang.String, org.archive.crawler.datamodel.CandidateURI)
226      */
227     public void forget(String key, CandidateURI value) {
228         throw new UnsupportedOperationException();
229     }
230 
231     /* (non-Javadoc)
232      * @see org.archive.crawler.datamodel.UriUniqFilter#requestFlush()
233      */
234     public synchronized long requestFlush() {
235         if(System.currentTimeMillis()>nextFlushAllowableAfter) {
236             return flush();
237         } else {
238 //            LOGGER.info("declining to flush: too soon after last flush");
239             return -1; 
240         }
241     }
242 
243     /***
244      * Perform a merge of all 'pending' items to the overall fingerprint list. 
245      * If the pending item is new, and has an associated CandidateURI, pass that
246      * URI along to the 'receiver' (frontier) for queueing. 
247      * 
248      * @return number of pending items actually added 
249      */
250     public synchronized long flush() {
251         if(pending()==0) {
252             return 0;
253         }
254         long flushStartTime = System.currentTimeMillis();
255         long adds = 0; 
256         long fpOnlyAdds = 0;
257         Long currFp = null; 
258         PendingItem currPend = null; 
259         
260         Iterator pendIter = pendingSet.iterator();
261         LongIterator fpIter = beginFpMerge();
262 
263         currPend = (PendingItem) (pendIter.hasNext() ? pendIter.next() : null);
264         currFp = (Long) (fpIter.hasNext() ? fpIter.next() : null); 
265 
266         while(true) {
267             while(currFp!=null && (currPend==null||(currFp.longValue() <= currPend.fp))) {
268                 addNewFp(currFp.longValue());
269                 if(currPend!=null && currFp.longValue() == currPend.fp) {
270                     mergeDuplicateCount++;
271                 }
272                 if(fpIter.hasNext()) {
273                     currFp = (Long) fpIter.next();
274                 } else {
275                     currFp = null;
276                     break;
277                 }
278             }
279             while(currPend!=null && (currFp==null||(currFp.longValue() > currPend.fp))) {
280                 addNewFp(currPend.fp);
281                 if(currPend.caUri!=null) {
282                     adds++;
283                     this.receiver.receive(currPend.caUri);
284                 } else {
285                     fpOnlyAdds++;
286                 }
287                 if(pendIter.hasNext()) {
288                     currPend = (PendingItem)pendIter.next();
289                 } else {
290                     currPend = null;
291                     break;
292                 }
293             }
294             if(currFp==null) {
295                 // currPend must be null too, or while wouldn't have exitted
296                 // done
297                 break;
298             } 
299         }
300         // maintain throttle timing
301         long flushDuration = System.currentTimeMillis() - flushStartTime;
302         nextFlushAllowableAfter = flushStartTime + (FLUSH_DELAY_FACTOR*flushDuration);
303         
304         // add/duplicate statistics
305         if(LOGGER.isLoggable(Level.INFO)) {
306             long mergeDups = (mergeDuplicateCount-mergeDupAtLast);
307             long pendDups = (pendDuplicateCount-pendDupAtLast);
308             long quickDups = (quickDuplicateCount-quickDupAtLast);
309             LOGGER.info("flush took "+flushDuration+"ms: "
310                     +adds+" adds, "
311                     +fpOnlyAdds+" fpOnlydds, "
312                     +mergeDups+" mergeDups, "
313                     +pendDups+" pendDups, "
314                     +quickDups+" quickDups ");
315             if(adds==0 && fpOnlyAdds==0 && mergeDups == 0 && pendDups == 0 && quickDups == 0) {
316                 LOGGER.info("that's odd");
317             }
318         }
319         mergeDupAtLast = mergeDuplicateCount;
320         pendDupAtLast = pendDuplicateCount;
321         quickDupAtLast = quickDuplicateCount;
322         pendingSet.clear();
323         finishFpMerge();
324         return adds;
325     }
326     
327     /***
328      * Begin merging pending candidates with complete list. Return an
329      * Iterator which will return all previously-known FPs in turn. 
330      * 
331      * @return Iterator over all previously-known FPs
332      */
333     abstract protected LongIterator beginFpMerge();
334 
335     
336     /***
337      * Add an FP (which may be an old or new FP) to the new complete
338      * list. Should only be called after beginFpMerge() and before
339      * finishFpMerge(). 
340      * 
341      * @param fp  the FP to add
342      */
343     abstract protected void addNewFp(long fp);
344 
345     /***
346      * Complete the merge of candidate and previously-known FPs (closing
347      * files/iterators as appropriate). 
348      */
349     abstract protected void finishFpMerge();
350 
351     public void close() {
352         if (profileLog != null) {
353             profileLog.close();
354         }
355     }
356 
357     public void setProfileLog(File logfile) {
358         try {
359             profileLog = new PrintWriter(new BufferedOutputStream(
360                     new FileOutputStream(logfile)));
361         } catch (FileNotFoundException e) {
362             throw new RuntimeException(e);
363         }
364     }
365 }