1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.archive.crawler.frontier;
24
25 import java.io.IOException;
26 import java.io.Serializable;
27 import java.io.UnsupportedEncodingException;
28 import java.util.logging.Level;
29 import java.util.logging.Logger;
30
31 import org.archive.crawler.datamodel.CrawlURI;
32 import org.archive.util.ArchiveUtils;
33 import org.archive.util.IoUtils;
34
35 import com.sleepycat.je.DatabaseEntry;
36 import com.sleepycat.je.DatabaseException;
37
38
39 /***
40 * One independent queue of items with the same 'classKey' (eg host).
41 * @author gojomo
42 */
43 public class BdbWorkQueue extends WorkQueue
44 implements Comparable, Serializable {
45 private static Logger LOGGER =
46 Logger.getLogger(BdbWorkQueue.class.getName());
47
48
49 private static final long serialVersionUID = ArchiveUtils
50 .classnameBasedUID(BdbWorkQueue.class, 1);
51
52 /***
53 * All items in this queue have this same 'origin'
54 * prefix to their keys.
55 */
56 private byte[] origin;
57
58 /***
59 * Create a virtual queue inside the given BdbMultipleWorkQueues
60 *
61 * @param classKey
62 */
63 public BdbWorkQueue(String classKey, BdbFrontier frontier) {
64 super(classKey);
65 this.origin = BdbMultipleWorkQueues.calculateOriginKey(classKey);
66 if (LOGGER.isLoggable(Level.FINE)) {
67 LOGGER.fine(getPrefixClassKey(this.origin) + " " + classKey);
68 }
69
70
71 frontier.getWorkQueues().addCap(origin);
72 }
73
74 protected long deleteMatchingFromQueue(final WorkQueueFrontier frontier,
75 final String match) throws IOException {
76 try {
77 final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
78 .getWorkQueues();
79 return queues.deleteMatchingFromQueue(match, classKey,
80 new DatabaseEntry(origin));
81 } catch (DatabaseException e) {
82 throw IoUtils.wrapAsIOException(e);
83 }
84 }
85
86 protected void deleteItem(final WorkQueueFrontier frontier,
87 final CrawlURI peekItem) throws IOException {
88 try {
89 final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
90 .getWorkQueues();
91 queues.delete(peekItem);
92 } catch (DatabaseException e) {
93 e.printStackTrace();
94 throw IoUtils.wrapAsIOException(e);
95 }
96 }
97
98 protected CrawlURI peekItem(final WorkQueueFrontier frontier)
99 throws IOException {
100 final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
101 .getWorkQueues();
102 DatabaseEntry key = new DatabaseEntry(origin);
103 CrawlURI curi = null;
104 int tries = 1;
105 while(true) {
106 try {
107 curi = queues.get(key);
108 } catch (DatabaseException e) {
109 LOGGER.log(Level.SEVERE,"peekItem failure; retrying",e);
110 }
111
112
113 if(!ArchiveUtils.startsWith(key.getData(),origin)) {
114 LOGGER.severe(
115 "inconsistency: "+classKey+"("+
116 getPrefixClassKey(origin)+") with " + getCount() + " items gave "
117 + curi +"("+getPrefixClassKey(key.getData()));
118
119 curi = null;
120
121 key.setData(origin);
122 }
123
124 if (curi!=null) {
125
126 break;
127 }
128
129 if (tries>3) {
130 LOGGER.severe("no item where expected in queue "+classKey);
131 break;
132 }
133 tries++;
134 LOGGER.severe("Trying get #" + Integer.toString(tries)
135 + " in queue " + classKey + " with " + getCount()
136 + " items using key "
137 + getPrefixClassKey(key.getData()));
138 }
139
140 return curi;
141 }
142
143 protected void insertItem(final WorkQueueFrontier frontier,
144 final CrawlURI curi, boolean overwriteIfPresent) throws IOException {
145 try {
146 final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier)
147 .getWorkQueues();
148 queues.put(curi, overwriteIfPresent);
149 if (LOGGER.isLoggable(Level.FINE)) {
150 LOGGER.fine("Inserted into " + getPrefixClassKey(this.origin) +
151 " (count " + Long.toString(getCount())+ "): " +
152 curi.toString());
153 }
154 } catch (DatabaseException e) {
155 throw IoUtils.wrapAsIOException(e);
156 }
157 }
158
159 /***
160 * @param byteArray Byte array to get hex string of.
161 * @return Hex string of passed in byte array (Used logging
162 * key-prefixes).
163 */
164 protected static String getPrefixClassKey(final byte [] byteArray) {
165 int zeroIndex = 0;
166 while(byteArray[zeroIndex]!=0) {
167 zeroIndex++;
168 }
169 try {
170 return new String(byteArray,0,zeroIndex,"UTF-8");
171 } catch (UnsupportedEncodingException e) {
172
173 e.printStackTrace();
174 return e.getMessage();
175 }
176 }
177 }