1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.archive.crawler.frontier;
24
25 import java.io.IOException;
26 import java.io.PrintWriter;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.TreeSet;
30 import java.util.logging.Level;
31 import java.util.logging.Logger;
32
33 import org.archive.util.ArchiveUtils;
34 import org.archive.util.Reporter;
35
36 import com.sleepycat.bind.EntryBinding;
37 import com.sleepycat.bind.serial.StoredClassCatalog;
38 import com.sleepycat.bind.tuple.IntegerBinding;
39 import com.sleepycat.bind.tuple.StringBinding;
40 import com.sleepycat.je.Cursor;
41 import com.sleepycat.je.Database;
42 import com.sleepycat.je.DatabaseConfig;
43 import com.sleepycat.je.DatabaseEntry;
44 import com.sleepycat.je.DatabaseException;
45 import com.sleepycat.je.Environment;
46 import com.sleepycat.je.LockMode;
47 import com.sleepycat.je.OperationStatus;
48
49 /***
50 * Maintains an ordered list of {@link AdaptiveRevisitHostQueue}s used by a
51 * Frontier.
52 * <p>
53 * The list is ordered by the
54 * {@link AdaptiveRevisitHostQueue#getNextReadyTime() AdaptiveRevisitHostQueue.getNextReadyTime()},
55 * smallest value at the top of the list and then on in descending order.
56 * <p>
57 * The list will maintain a list of hostnames in a seperate DB. On creation a
58 * list will try to open the DB at a specified location. If it already exists
59 * the list will create HQs for all the hostnames in the list, discarding
60 * those that turn out to be empty.
61 * <p>
62 * Any BDB DatabaseException will be converted to an IOException by public
63 * methods. This includes preserving the original stacktrace, in favor of the
64 * one created for the IOException, so that the true source of the exception
65 * is not lost.
66 *
67 * @author Kristinn Sigurdsson
68 */
69 public class AdaptiveRevisitQueueList implements Reporter {
70
71
72
73 /*** The Environment for the BerkleyDB databases in the HQs */
74 private final Environment env;
75 private final StoredClassCatalog catalog;
76 /*** Contains host names for all HQs. Name is key, valence is value */
77 private Database hostNamesDB;
78 private EntryBinding keyBinding;
79 private EntryBinding valueBinding;
80
81 /*** A hash table of all the HQs (wrapped), keyed by hostName */
82 private HashMap<String,AdaptiveRevisitHostQueueWrapper> hostQueues;
83 /*** Contains the hostQueues (wrapped) sorted by their time of next fetch */
84 private TreeSet<AdaptiveRevisitHostQueueWrapper> sortedHostQueues;
85 /*** Logger */
86 private static final Logger logger =
87 Logger.getLogger(AdaptiveRevisitQueueList.class.getName());
88
89
90 public AdaptiveRevisitQueueList(Environment env,
91 StoredClassCatalog catalog)
92 throws IOException {
93 Cursor cursor = null;
94 try {
95 this.env = env;
96 this.catalog = catalog;
97 keyBinding = new StringBinding();
98 valueBinding = new IntegerBinding();
99
100
101 hostQueues = new HashMap<String,AdaptiveRevisitHostQueueWrapper>();
102 sortedHostQueues = new TreeSet<AdaptiveRevisitHostQueueWrapper>();
103
104
105 DatabaseConfig dbConfig = new DatabaseConfig();
106 dbConfig.setTransactional(false);
107 dbConfig.setAllowCreate(true);
108 hostNamesDB = env.openDatabase(null, "hostNames", dbConfig);
109
110
111 cursor = hostNamesDB.openCursor(null, null);
112 DatabaseEntry keyEntry = new DatabaseEntry();
113 DatabaseEntry dataEntry = new DatabaseEntry();
114 OperationStatus opStatus = cursor.getFirst(keyEntry, dataEntry,
115 LockMode.DEFAULT);
116 while (opStatus == OperationStatus.SUCCESS) {
117
118 String hostName = (String)keyBinding.entryToObject(keyEntry);
119 int valence = ((Integer)valueBinding.entryToObject(dataEntry)).
120 intValue();
121 opStatus = cursor.getNext(keyEntry, dataEntry, LockMode.DEFAULT);
122
123 createHQ(hostName, valence);
124
125
126 }
127 } catch (DatabaseException e) {
128 throw convertDbException(e);
129 } finally {
130 if (cursor != null) {
131 try {
132 cursor.close();
133 } catch (DatabaseException e) {
134 throw convertDbException(e);
135 }
136 }
137 }
138 }
139
140 private IOException convertDbException(Exception e) {
141 IOException e2 = new IOException(e.getMessage());
142 e2.setStackTrace(e.getStackTrace());
143 return e2;
144 }
145
146 /***
147 * Get an AdaptiveRevisitHostQueue for the specified host.
148 * <p>
149 * If one does not already exist, null is returned
150 *
151 * @param hostName The host's name
152 * @return an AdaptiveRevisitHostQueue for the specified host
153 */
154 public AdaptiveRevisitHostQueue getHQ(String hostName) {
155 AdaptiveRevisitHostQueueWrapper wrapper =
156 ((AdaptiveRevisitHostQueueWrapper)hostQueues.get(hostName));
157 if (wrapper != null) {
158 return wrapper.hq;
159 }
160 return null;
161 }
162
163 /***
164 * Creates a new AdaptiveRevisitHostQueue.
165 * <p>
166 * If a HQ already existed for the specified hostName, the existing HQ
167 * is returned as it is. It's existing valence will <i>not</i> be updated
168 * to reflect a different valence.
169 *
170 * @param hostName
171 * @param valence number of simultaneous connections allowed to this host
172 * @return the newly created HQ
173 * @throws IOException
174 */
175 public AdaptiveRevisitHostQueue createHQ(String hostName, int valence)
176 throws IOException{
177 AdaptiveRevisitHostQueueWrapper hqw = hostQueues.get(hostName);
178 if (hqw != null) {
179 return hqw.hq;
180 }
181 AdaptiveRevisitHostQueue hq;
182
183
184 hq = new AdaptiveRevisitHostQueue(hostName, env, catalog, valence);
185 hq.setOwner(this);
186
187 try{
188 DatabaseEntry keyEntry = new DatabaseEntry();
189 DatabaseEntry dataEntry = new DatabaseEntry();
190 keyBinding.objectToEntry(hostName,keyEntry);
191 valueBinding.objectToEntry(new Integer(valence),dataEntry);
192 hostNamesDB.put(null,keyEntry,dataEntry);
193 AdaptiveRevisitHostQueueWrapper tmp =
194 new AdaptiveRevisitHostQueueWrapper(hq);
195 hostQueues.put(hostName,tmp);
196 sortedHostQueues.add(tmp);
197 return hq;
198 } catch (DatabaseException e) {
199 throw convertDbException(e);
200 }
201 }
202
203 public AdaptiveRevisitHostQueue getTopHQ(){
204 AdaptiveRevisitHostQueueWrapper wrapper =
205 (AdaptiveRevisitHostQueueWrapper)sortedHostQueues.first();
206 return wrapper.hq;
207 }
208
209 /***
210 * Returns the number of URIs in all the HQs in this list
211 * @return the number of URIs in all the HQs in this list
212 */
213 public long getSize() {
214 long size = 0;
215 for (Iterator it = sortedHostQueues.iterator(); it.hasNext();) {
216 AdaptiveRevisitHostQueue hq = ((AdaptiveRevisitHostQueueWrapper)it
217 .next()).hq;
218 size += hq.getSize();
219 }
220 return size;
221 }
222
223 /***
224 * Returns the average depth of all the HQs in this list
225 * @return the average depth of all the HQs in this list (rounded down)
226 */
227 public long getAverageDepth() {
228 long size = getSize();
229 return size/hostQueues.size();
230 }
231
232 /***
233 * Returns the size of the largest (deepest) queue.
234 * @return the size of the largest (deepest) queue.
235 */
236 public long getDeepestQueueSize(){
237 long size = 0;
238 for (Iterator it = sortedHostQueues.iterator(); it.hasNext();) {
239 AdaptiveRevisitHostQueue hq = ((AdaptiveRevisitHostQueueWrapper)it
240 .next()).hq;
241 if(hq.getSize() > size){
242 size = hq.getSize();
243 }
244 }
245 return size;
246 }
247
248 /***
249 * Returns the congestion ratio.
250 * <p>
251 * The congestion ratio is equal to the total number of queues divided
252 * by the number of queues currently being processed or are snozzed (i.e.
253 * not ready). A congestion ratio of 1 indicates no congestion.
254 * @return the congestion ratio
255 */
256 public float getCongestionRatio(){
257 int readyQueues = 0;
258 for (Iterator it = sortedHostQueues.iterator(); it.hasNext();) {
259 AdaptiveRevisitHostQueue hq = ((AdaptiveRevisitHostQueueWrapper)it
260 .next()).hq;
261 if(hq.getState() == AdaptiveRevisitHostQueue.HQSTATE_READY){
262 readyQueues++;
263 }
264 }
265 int totalQueues = hostQueues.size();
266
267 return (float)(totalQueues) / (totalQueues-readyQueues);
268 }
269
270 /***
271 * This method reorders the host queues. Method is only called by the
272 * AdaptiveRevisitHostQueue that it 'owns' when their reported time of next
273 * ready is being updated.
274 *
275 * @param hq The calling HQ
276 */
277 protected void reorder(AdaptiveRevisitHostQueue hq){
278
279 AdaptiveRevisitHostQueueWrapper wrapper =
280 (AdaptiveRevisitHostQueueWrapper)hostQueues.get(hq.getHostName());
281
282 long newTime = hq.getNextReadyTime();
283
284 if(newTime != wrapper.nextReadyTime){
285
286 if (logger.isLoggable(Level.FINER)) {
287 logger.finer("reorder(" + hq.getHostName() + ") was "
288 + wrapper.nextReadyTime);
289 }
290
291 sortedHostQueues.remove(wrapper);
292
293 wrapper.nextReadyTime = newTime;
294 if (logger.isLoggable(Level.FINER)) {
295 logger.finer("reorder(" + hq.getHostName() + ") is "
296 + wrapper.nextReadyTime);
297 }
298
299 sortedHostQueues.add(wrapper);
300 }
301 }
302
303 /***
304 * The total number of URIs queued in all the HQs belonging to this list.
305 *
306 * @return total number of URIs queued in all the HQs belonging to this list.
307 */
308 public long getUriCount(){
309 Iterator it = hostQueues.keySet().iterator();
310 long count = 0;
311 while(it.hasNext()){
312 AdaptiveRevisitHostQueueWrapper wrapper =
313 (AdaptiveRevisitHostQueueWrapper)it.next();
314 count += wrapper.hq.getSize();
315 }
316 return count;
317 }
318
319 /***
320 * This class wraps an AdaptiveRevisitHostQueue with a fixed value for next
321 * ready time.
322 * This is done to facilitate sorting by making sure that the value does
323 * not change while the HQ is in the sorted list. With this wrapper, it
324 * is possible to remove it from the sorted list, then update the time of
325 * next ready, and then readd (resort) it.
326 *
327 * @author Kristinn Sigurdsson
328 */
329 private class AdaptiveRevisitHostQueueWrapper implements Comparable{
330 long nextReadyTime;
331 AdaptiveRevisitHostQueue hq;
332
333 public AdaptiveRevisitHostQueueWrapper(AdaptiveRevisitHostQueue hq){
334 nextReadyTime = hq.getNextReadyTime();
335 this.hq = hq;
336 }
337
338 /***
339 * Compares the this ARHQWrapper to the supplied one.
340 *
341 * @param obj the HQ to compare to. If this object is not an instance
342 * of ARHostQueueWrapper then the method will throw a
343 * ClassCastException.
344 * @return a value less than 0 if this HQWrappers time of next ready
345 * value is less than the argument HQWrappers's; and a value
346 * greater than 0 if this value is greater. If the time of
347 * next ready is equal, the hostName strings will be compared
348 * and that result returned.
349 */
350 public int compareTo(Object obj){
351 AdaptiveRevisitHostQueueWrapper comp =
352 (AdaptiveRevisitHostQueueWrapper)obj;
353
354 long compTime = comp.nextReadyTime;
355
356 if(nextReadyTime>compTime){
357 return 1;
358 } else if(nextReadyTime<compTime){
359 return -1;
360 } else {
361
362 return hq.getHostName().compareTo(comp.hq.getHostName());
363 }
364 }
365 }
366
367 /***
368 * Closes all HQs and the Environment.
369 */
370 public void close(){
371 Iterator it = sortedHostQueues.iterator();
372 while(it.hasNext()){
373 AdaptiveRevisitHostQueue hq =
374 ((AdaptiveRevisitHostQueueWrapper)it.next()).hq;
375 try {
376 hq.close();
377 } catch (IOException e) {
378 logger.severe("IOException while closing " + hq.getHostName() +
379 "\n" + e.getMessage());
380 }
381 }
382 try {
383 hostNamesDB.close();
384 } catch (DatabaseException e) {
385 logger.severe("IOException while closing hostNamesDB" +
386 "\n" + e.getMessage());
387 }
388 }
389
390
391
392
393
394 public String[] getReports() {
395
396 return new String[] {};
397 }
398
399
400
401
402 public String singleLineReport() {
403 return ArchiveUtils.singleLineReport(this);
404 }
405
406
407
408
409 public void reportTo(PrintWriter writer) {
410 Iterator it = sortedHostQueues.iterator();
411 while(it.hasNext()){
412 AdaptiveRevisitHostQueueWrapper wrapper =
413 (AdaptiveRevisitHostQueueWrapper)it.next();
414
415 writer.print(wrapper.hq.report(10));
416 writer.print("\n\n");
417 }
418 }
419
420 public void reportTo(String name, PrintWriter writer) {
421 if(name==null || hostQueues.containsKey(name)==false){
422 reportTo(writer);
423 } else {
424 AdaptiveRevisitHostQueueWrapper wrapper =
425 (AdaptiveRevisitHostQueueWrapper)hostQueues.get(name);
426
427 writer.print(wrapper.hq.report(0));
428 writer.print("\n\n");
429 }
430 }
431
432 public void singleLineReportTo(PrintWriter writer) {
433 Iterator it = sortedHostQueues.iterator();
434 int total = 0;
435 int ready = 0;
436 int snoozed = 0;
437 int empty = 0;
438 int busy = 0;
439 while(it.hasNext()){
440 AdaptiveRevisitHostQueueWrapper wrapper =
441 (AdaptiveRevisitHostQueueWrapper)it.next();
442 total++;
443 switch(wrapper.hq.getState()){
444 case AdaptiveRevisitHostQueue.HQSTATE_BUSY : busy++; break;
445 case AdaptiveRevisitHostQueue.HQSTATE_EMPTY : empty++; break;
446 case AdaptiveRevisitHostQueue.HQSTATE_READY : ready++; break;
447 case AdaptiveRevisitHostQueue.HQSTATE_SNOOZED : snoozed++; break;
448 }
449 }
450 writer.print(total + " queues: " + ready + " ready, " + snoozed +
451 " snoozed, " + busy + " busy, and " + empty + " empty");
452 }
453
454
455
456
457 public String singleLineLegend() {
458 return "total ready snoozed busy empty";
459 }
460
461 }