1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.archive.crawler.util;
26
27 import it.unimi.dsi.fastutil.longs.LongIterator;
28
29 import java.io.BufferedOutputStream;
30 import java.io.File;
31 import java.io.FileNotFoundException;
32 import java.io.FileOutputStream;
33 import java.io.PrintWriter;
34 import java.util.Iterator;
35 import java.util.TreeSet;
36 import java.util.logging.Level;
37 import java.util.logging.Logger;
38
39 import org.archive.crawler.datamodel.CandidateURI;
40 import org.archive.crawler.datamodel.UriUniqFilter;
41 import org.archive.util.fingerprint.ArrayLongFPCache;
42
43 import st.ata.util.FPGenerator;
44
45 /***
46 * UriUniqFilter based on merging FP arrays (in memory or from disk).
47 *
48 * Inspired by the approach in Najork and Heydon, "High-Performance
49 * Web Crawling" (2001), section 3.2, "Efficient Duplicate URL
50 * Eliminators".
51 *
52 * @author gojomo
53 */
54 public abstract class FPMergeUriUniqFilter implements UriUniqFilter {
55 /***
56 * Represents a long fingerprint and (possibly) its corresponding
57 * CandidateURI, awaiting the next merge in a 'pending' state.
58 */
59 public class PendingItem implements Comparable {
60 long fp;
61 CandidateURI caUri;
62 public PendingItem(long fp, CandidateURI value) {
63 this.fp = fp;
64 this.caUri = value;
65 }
66 public int compareTo(Object arg0) {
67 PendingItem vs = (PendingItem) arg0;
68 return (fp < vs.fp) ? -1 : ( (fp == vs.fp) ? 0 : 1);
69 }
70 }
71
72 private static Logger LOGGER =
73 Logger.getLogger(FPMergeUriUniqFilter.class.getName());
74
75 protected HasUriReceiver receiver;
76 protected PrintWriter profileLog;
77
78
79 protected long quickDuplicateCount = 0;
80 protected long quickDupAtLast = 0;
81 protected long pendDuplicateCount = 0;
82 protected long pendDupAtLast = 0;
83 protected long mergeDuplicateCount = 0;
84 protected long mergeDupAtLast = 0;
85
86 /*** items awaiting merge
87 * TODO: consider only sorting just pre-merge
88 * TODO: consider using a fastutil long->Object class
89 * TODO: consider actually writing items to disk file,
90 * as in Najork/Heydon
91 */
92 protected TreeSet<PendingItem> pendingSet = new TreeSet<PendingItem>();
93
94 /*** size at which to force flush of pending items */
95 protected int maxPending = DEFAULT_MAX_PENDING;
96 public static final int DEFAULT_MAX_PENDING = 10000;
97
98
99 /***
100 * time-based throttle on flush-merge operations
101 */
102 protected long nextFlushAllowableAfter = 0;
103 public static final long FLUSH_DELAY_FACTOR = 100;
104
105 /*** cache of most recently seen FPs */
106 protected ArrayLongFPCache quickCache = new ArrayLongFPCache();
107
108
109 public FPMergeUriUniqFilter() {
110 super();
111 String profileLogFile =
112 System.getProperty(FPMergeUriUniqFilter.class.getName()
113 + ".profileLogFile");
114 if (profileLogFile != null) {
115 setProfileLog(new File(profileLogFile));
116 }
117 }
118
119 public void setMaxPending(int max) {
120 maxPending = max;
121 }
122
123 public long pending() {
124 return pendingSet.size();
125 }
126
127 public void setDestination(HasUriReceiver receiver) {
128 this.receiver = receiver;
129 }
130
131 protected void profileLog(String key) {
132 if (profileLog != null) {
133 profileLog.println(key);
134 }
135 }
136
137
138
139
140 public synchronized void add(String key, CandidateURI value) {
141 profileLog(key);
142 long fp = createFp(key);
143 if(! quickCheck(fp)) {
144 quickDuplicateCount++;
145 return;
146 }
147 pend(fp,value);
148 if (pendingSet.size()>=maxPending) {
149 flush();
150 }
151 }
152
153 /***
154 * Place the given FP/CandidateURI pair into the pending set, awaiting
155 * a merge to determine if it's actually accepted.
156 *
157 * @param fp long fingerprint
158 * @param value CandidateURI or null, if fp only needs merging (as when
159 * CandidateURI was already forced in
160 */
161 protected void pend(long fp, CandidateURI value) {
162
163 if(count()==0) {
164 if(pendingSet.add(new PendingItem(fp,null))==false) {
165 pendDuplicateCount++;
166 } else {
167
168 if(value!=null) {
169 this.receiver.receive(value);
170 }
171 }
172 return;
173 }
174 if(pendingSet.add(new PendingItem(fp,value))==false) {
175 pendDuplicateCount++;
176 }
177 }
178
179 /***
180 * Evaluate if quick-check cache considers fingerprint novel enough
181 * for further consideration.
182 *
183 * @param fp long fingerprint to check
184 * @return true if fp deserves consideration; false if it appears in cache
185 */
186 private boolean quickCheck(long fp) {
187 return quickCache.add(fp);
188 }
189
190 /***
191 * Create a fingerprint from the given key
192 *
193 * @param key CharSequence (URI) to fingerprint
194 * @return long fingerprint
195 */
196 public static long createFp(CharSequence key) {
197 return FPGenerator.std64.fp(key);
198 }
199
200
201
202
203
204 public void addNow(String key, CandidateURI value) {
205 add(key, value);
206 flush();
207 }
208
209
210
211
212 public void addForce(String key, CandidateURI value) {
213 add(key,null);
214 this.receiver.receive(value);
215 }
216
217
218
219
220 public void note(String key) {
221 add(key,null);
222 }
223
224
225
226
227 public void forget(String key, CandidateURI value) {
228 throw new UnsupportedOperationException();
229 }
230
231
232
233
234 public synchronized long requestFlush() {
235 if(System.currentTimeMillis()>nextFlushAllowableAfter) {
236 return flush();
237 } else {
238
239 return -1;
240 }
241 }
242
243 /***
244 * Perform a merge of all 'pending' items to the overall fingerprint list.
245 * If the pending item is new, and has an associated CandidateURI, pass that
246 * URI along to the 'receiver' (frontier) for queueing.
247 *
248 * @return number of pending items actually added
249 */
250 public synchronized long flush() {
251 if(pending()==0) {
252 return 0;
253 }
254 long flushStartTime = System.currentTimeMillis();
255 long adds = 0;
256 long fpOnlyAdds = 0;
257 Long currFp = null;
258 PendingItem currPend = null;
259
260 Iterator pendIter = pendingSet.iterator();
261 LongIterator fpIter = beginFpMerge();
262
263 currPend = (PendingItem) (pendIter.hasNext() ? pendIter.next() : null);
264 currFp = (Long) (fpIter.hasNext() ? fpIter.next() : null);
265
266 while(true) {
267 while(currFp!=null && (currPend==null||(currFp.longValue() <= currPend.fp))) {
268 addNewFp(currFp.longValue());
269 if(currPend!=null && currFp.longValue() == currPend.fp) {
270 mergeDuplicateCount++;
271 }
272 if(fpIter.hasNext()) {
273 currFp = (Long) fpIter.next();
274 } else {
275 currFp = null;
276 break;
277 }
278 }
279 while(currPend!=null && (currFp==null||(currFp.longValue() > currPend.fp))) {
280 addNewFp(currPend.fp);
281 if(currPend.caUri!=null) {
282 adds++;
283 this.receiver.receive(currPend.caUri);
284 } else {
285 fpOnlyAdds++;
286 }
287 if(pendIter.hasNext()) {
288 currPend = (PendingItem)pendIter.next();
289 } else {
290 currPend = null;
291 break;
292 }
293 }
294 if(currFp==null) {
295
296
297 break;
298 }
299 }
300
301 long flushDuration = System.currentTimeMillis() - flushStartTime;
302 nextFlushAllowableAfter = flushStartTime + (FLUSH_DELAY_FACTOR*flushDuration);
303
304
305 if(LOGGER.isLoggable(Level.INFO)) {
306 long mergeDups = (mergeDuplicateCount-mergeDupAtLast);
307 long pendDups = (pendDuplicateCount-pendDupAtLast);
308 long quickDups = (quickDuplicateCount-quickDupAtLast);
309 LOGGER.info("flush took "+flushDuration+"ms: "
310 +adds+" adds, "
311 +fpOnlyAdds+" fpOnlydds, "
312 +mergeDups+" mergeDups, "
313 +pendDups+" pendDups, "
314 +quickDups+" quickDups ");
315 if(adds==0 && fpOnlyAdds==0 && mergeDups == 0 && pendDups == 0 && quickDups == 0) {
316 LOGGER.info("that's odd");
317 }
318 }
319 mergeDupAtLast = mergeDuplicateCount;
320 pendDupAtLast = pendDuplicateCount;
321 quickDupAtLast = quickDuplicateCount;
322 pendingSet.clear();
323 finishFpMerge();
324 return adds;
325 }
326
327 /***
328 * Begin merging pending candidates with complete list. Return an
329 * Iterator which will return all previously-known FPs in turn.
330 *
331 * @return Iterator over all previously-known FPs
332 */
333 abstract protected LongIterator beginFpMerge();
334
335
336 /***
337 * Add an FP (which may be an old or new FP) to the new complete
338 * list. Should only be called after beginFpMerge() and before
339 * finishFpMerge().
340 *
341 * @param fp the FP to add
342 */
343 abstract protected void addNewFp(long fp);
344
345 /***
346 * Complete the merge of candidate and previously-known FPs (closing
347 * files/iterators as appropriate).
348 */
349 abstract protected void finishFpMerge();
350
351 public void close() {
352 if (profileLog != null) {
353 profileLog.close();
354 }
355 }
356
357 public void setProfileLog(File logfile) {
358 try {
359 profileLog = new PrintWriter(new BufferedOutputStream(
360 new FileOutputStream(logfile)));
361 } catch (FileNotFoundException e) {
362 throw new RuntimeException(e);
363 }
364 }
365 }