View Javadoc

1   /* BdbWorkQueue
2    * 
3    * Created on Dec 24, 2004
4    *
5    * Copyright (C) 2004 Internet Archive.
6    * 
7    * This file is part of the Heritrix web crawler (crawler.archive.org).
8    * 
9    * Heritrix is free software; you can redistribute it and/or modify
10   * it under the terms of the GNU Lesser Public License as published by
11   * the Free Software Foundation; either version 2.1 of the License, or
12   * any later version.
13   * 
14   * Heritrix is distributed in the hope that it will be useful, 
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   * GNU Lesser Public License for more details.
18   * 
19   * You should have received a copy of the GNU Lesser Public License
20   * along with Heritrix; if not, write to the Free Software
21   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22   */
23  package org.archive.crawler.frontier;
24  
25  import java.io.IOException;
26  import java.io.Serializable;
27  import java.io.UnsupportedEncodingException;
28  import java.util.logging.Level;
29  import java.util.logging.Logger;
30  
31  import org.archive.crawler.datamodel.CrawlURI;
32  import org.archive.util.ArchiveUtils;
33  import org.archive.util.IoUtils;
34  
35  import com.sleepycat.je.DatabaseEntry;
36  import com.sleepycat.je.DatabaseException;
37  
38  
39  /***
40   * One independent queue of items with the same 'classKey' (eg host).
41   * @author gojomo
42   */
43  public class BdbWorkQueue extends WorkQueue
44  implements Comparable, Serializable {
45      private static Logger LOGGER =
46          Logger.getLogger(BdbWorkQueue.class.getName());
47      
48      // be robust against trivial implementation changes
49      private static final long serialVersionUID = ArchiveUtils
50          .classnameBasedUID(BdbWorkQueue.class, 1);
51  
52      /***
53       * All items in this queue have this same 'origin'
54       * prefix to their keys.
55       */
56      private byte[] origin;
57  
58      /***
59       * Create a virtual queue inside the given BdbMultipleWorkQueues 
60       * 
61       * @param classKey
62       */
63      public BdbWorkQueue(String classKey, BdbFrontier frontier) {
64          super(classKey);
65          this.origin = BdbMultipleWorkQueues.calculateOriginKey(classKey);
66          if (LOGGER.isLoggable(Level.FINE)) {
67              LOGGER.fine(getPrefixClassKey(this.origin) + " " + classKey);
68          }
69          // add the queue-front 'cap' entry; see...
70          // http://sourceforge.net/tracker/index.php?func=detail&aid=1262665&group_id=73833&atid=539102
71          frontier.getWorkQueues().addCap(origin);
72      }
73  
74      protected long deleteMatchingFromQueue(final WorkQueueFrontier frontier,
75              final String match) throws IOException {
76          try {
77              final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
78                  .getWorkQueues();
79              return queues.deleteMatchingFromQueue(match, classKey,
80                  new DatabaseEntry(origin));
81          } catch (DatabaseException e) {
82              throw IoUtils.wrapAsIOException(e);
83          }
84      }
85  
86      protected void deleteItem(final WorkQueueFrontier frontier,
87              final CrawlURI peekItem) throws IOException {
88          try {
89              final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
90                  .getWorkQueues();
91               queues.delete(peekItem);
92          } catch (DatabaseException e) {
93              e.printStackTrace();
94              throw IoUtils.wrapAsIOException(e);
95          }
96      }
97  
98      protected CrawlURI peekItem(final WorkQueueFrontier frontier)
99      throws IOException {
100         final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
101             .getWorkQueues();
102         DatabaseEntry key = new DatabaseEntry(origin);
103         CrawlURI curi = null;
104         int tries = 1;
105         while(true) {
106             try {
107                 curi = queues.get(key);
108             } catch (DatabaseException e) {
109                 LOGGER.log(Level.SEVERE,"peekItem failure; retrying",e);
110             }
111             
112             // ensure CrawlURI, if any,  came from acceptable range: 
113             if(!ArchiveUtils.startsWith(key.getData(),origin)) {
114                 LOGGER.severe(
115                     "inconsistency: "+classKey+"("+
116                     getPrefixClassKey(origin)+") with " + getCount() + " items gave "
117                     + curi +"("+getPrefixClassKey(key.getData()));
118                 // clear curi to allow retry
119                 curi = null; 
120                 // reset key to original origin for retry
121                 key.setData(origin);
122             }
123             
124             if (curi!=null) {
125                 // success
126                 break;
127             }
128             
129             if (tries>3) {
130                 LOGGER.severe("no item where expected in queue "+classKey);
131                 break;
132             }
133             tries++;
134             LOGGER.severe("Trying get #" + Integer.toString(tries)
135                     + " in queue " + classKey + " with " + getCount()
136                     + " items using key "
137                     + getPrefixClassKey(key.getData()));
138         }
139  
140         return curi;
141     }
142 
143     protected void insertItem(final WorkQueueFrontier frontier,
144             final CrawlURI curi, boolean overwriteIfPresent) throws IOException {
145         try {
146             final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
147                 .getWorkQueues();
148             queues.put(curi, overwriteIfPresent);
149             if (LOGGER.isLoggable(Level.FINE)) {
150                 LOGGER.fine("Inserted into " + getPrefixClassKey(this.origin) +
151                     " (count " + Long.toString(getCount())+ "): " +
152                         curi.toString());
153             }
154         } catch (DatabaseException e) {
155             throw IoUtils.wrapAsIOException(e);
156         }
157     }
158     
159     /***
160      * @param byteArray Byte array to get hex string of.
161      * @return Hex string of passed in byte array (Used logging
162      * key-prefixes).
163      */
164     protected static String getPrefixClassKey(final byte [] byteArray) {
165         int zeroIndex = 0;
166         while(byteArray[zeroIndex]!=0) {
167             zeroIndex++;
168         }
169         try {
170             return new String(byteArray,0,zeroIndex,"UTF-8");
171         } catch (UnsupportedEncodingException e) {
172             // should be impossible; UTF-8 always available
173             e.printStackTrace();
174             return e.getMessage();
175         }
176     }
177 }