1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.archive.crawler.prefetch;
24
25 import java.util.logging.Level;
26 import java.util.logging.Logger;
27
28 import org.archive.crawler.datamodel.CoreAttributeConstants;
29 import org.archive.crawler.datamodel.CrawlSubstats;
30 import org.archive.crawler.datamodel.CrawlURI;
31 import org.archive.crawler.datamodel.FetchStatusCodes;
32 import org.archive.crawler.framework.Processor;
33 import org.archive.crawler.settings.SimpleType;
34
35 /***
36 * A simple quota enforcer. If the host, server, or frontier group
37 * associated with the current CrawlURI is already over its quotas,
38 * blocks the current URI's processing with S_BLOCKED_BY_QUOTA.
39 *
40 * @author gojomo
41 * @version $Date: 2007-04-06 00:40:50 +0000 (Fri, 06 Apr 2007) $, $Revision: 5040 $
42 */
43 public class QuotaEnforcer extends Processor implements FetchStatusCodes {
44
45 private static final long serialVersionUID = 6091720623469404595L;
46
47 private final Logger LOGGER = Logger.getLogger(this.getClass().getName());
48
49
50 protected static final int SERVER = 0;
51 protected static final int HOST = 1;
52 protected static final int GROUP = 2;
53 protected static final int NAME = 0;
54 protected static final int SUCCESSES = 1;
55 protected static final int SUCCESS_KB = 2;
56 protected static final int RESPONSES = 3;
57 protected static final int RESPONSE_KB = 4;
58 protected static final String[][] keys = new String[][] {
59 {
60 "server",
61 "server-max-fetch-successes",
62 "server-max-success-kb",
63 "server-max-fetch-responses",
64 "server-max-all-kb"
65 },
66 {
67 "host",
68 "host-max-fetch-successes",
69 "host-max-success-kb",
70 "host-max-fetch-responses",
71 "host-max-all-kb"
72 },
73 {
74 "group",
75 "group-max-fetch-successes",
76 "group-max-success-kb",
77 "group-max-fetch-responses",
78 "group-max-all-kb"
79 }
80 };
81
82
83
84 /*** server max successful fetches */
85 protected static final String ATTR_SERVER_MAX_FETCH_SUCCESSES =
86 keys[SERVER][SUCCESSES];
87 protected static final Long DEFAULT_SERVER_MAX_FETCH_SUCCESSES =
88 new Long(-1);
89 /*** server max successful fetch bytes */
90 protected static final String ATTR_SERVER_MAX_SUCCESS_KB =
91 keys[SERVER][SUCCESS_KB];;
92 protected static final Long DEFAULT_SERVER_MAX_SUCCESS_KB =
93 new Long(-1);
94
95 /*** server max fetch responses (including error codes) */
96 protected static final String ATTR_SERVER_MAX_FETCH_RESPONSES =
97 keys[SERVER][RESPONSES];
98 protected static final Long DEFAULT_SERVER_MAX_FETCH_RESPONSES =
99 new Long(-1);
100 /*** server max all fetch bytes (including error responses) */
101 protected static final String ATTR_SERVER_MAX_ALL_KB =
102 keys[SERVER][RESPONSE_KB];
103 protected static final Long DEFAULT_SERVER_MAX_ALL_KB =
104 new Long(-1);
105
106
107
108 /*** host max successful fetches */
109 protected static final String ATTR_HOST_MAX_FETCH_SUCCESSES =
110 keys[HOST][SUCCESSES];;
111 protected static final Long DEFAULT_HOST_MAX_FETCH_SUCCESSES =
112 new Long(-1);
113 /*** host max successful fetch bytes */
114 protected static final String ATTR_HOST_MAX_SUCCESS_KB =
115 keys[HOST][SUCCESS_KB];;
116 protected static final Long DEFAULT_HOST_MAX_SUCCESS_KB =
117 new Long(-1);
118
119 /*** host max fetch responses (including error codes) */
120 protected static final String ATTR_HOST_MAX_FETCH_RESPONSES =
121 keys[HOST][RESPONSES];
122 protected static final Long DEFAULT_HOST_MAX_FETCH_RESPONSES =
123 new Long(-1);
124 /*** host max all fetch bytes (including error responses) */
125 protected static final String ATTR_HOST_MAX_ALL_KB =
126 keys[HOST][RESPONSE_KB];
127 protected static final Long DEFAULT_HOST_MAX_ALL_KB =
128 new Long(-1);
129
130
131
132 /*** group max successful fetches */
133 protected static final String ATTR_GROUP_MAX_FETCH_SUCCESSES =
134 keys[GROUP][SUCCESSES];
135 protected static final Long DEFAULT_GROUP_MAX_FETCH_SUCCESSES =
136 new Long(-1);
137 /*** group max successful fetch bytes */
138 protected static final String ATTR_GROUP_MAX_SUCCESS_KB =
139 keys[GROUP][SUCCESS_KB];
140 protected static final Long DEFAULT_GROUP_MAX_SUCCESS_KB =
141 new Long(-1);
142
143 /*** group max fetch responses (including error codes) */
144 protected static final String ATTR_GROUP_MAX_FETCH_RESPONSES =
145 keys[GROUP][RESPONSES];
146 protected static final Long DEFAULT_GROUP_MAX_FETCH_RESPONSES =
147 new Long(-1);
148 /*** group max all fetch bytes (including error responses) */
149 protected static final String ATTR_GROUP_MAX_ALL_KB =
150 keys[GROUP][RESPONSE_KB];
151 protected static final Long DEFAULT_GROUP_MAX_ALL_KB =
152 new Long(-1);
153
154 /*** whether to force-retire when over-quote detected */
155 protected static final String ATTR_FORCE_RETIRE =
156 "force-retire";
157 protected static final Boolean DEFAULT_FORCE_RETIRE = true;
158
159 /***
160 * Constructor.
161 * @param name Name of this processor.
162 */
163 public QuotaEnforcer(String name) {
164 super(name, "QuotaEnforcer.");
165
166 addElementToDefinition(new SimpleType(ATTR_FORCE_RETIRE,
167 "Whether an over-quota situation should result in the " +
168 "containing queue being force-retired (if the Frontier " +
169 "supports this). Note that if your queues combine URIs " +
170 "that are different with regard to the quota category, " +
171 "the retirement may hold back URIs not in the same " +
172 "quota category. " +
173 "Default is false.",
174 DEFAULT_FORCE_RETIRE));
175
176 String maxFetchSuccessesDesc = "Maximum number of fetch successes " +
177 "(e.g. 200 responses) to collect from one CATEGORY. " +
178 "Default is -1, meaning no limit.";
179 String maxSuccessKbDesc = "Maximum amount of fetch success content " +
180 "(e.g. 200 responses) in KB to collect from one CATEGORY. " +
181 "Default is -1, meaning no limit.";
182 String maxFetchResponsesDesc = "Maximum number of fetch responses " +
183 "(incl. error responses) to collect from one CATEGORY. " +
184 "Default is -1, meaning no limit.";
185 String maxAllKbDesc = "Maximum amount of response content " +
186 "(incl. error responses) in KB to collect from one CATEGORY. " +
187 "Default is -1, meaning no limit.";
188
189 addElementToDefinition(new SimpleType(ATTR_SERVER_MAX_FETCH_SUCCESSES,
190 maxFetchSuccessesDesc.replaceAll("CATEGORY","server"),
191 DEFAULT_SERVER_MAX_FETCH_SUCCESSES));
192 addElementToDefinition(new SimpleType(ATTR_SERVER_MAX_SUCCESS_KB,
193 maxSuccessKbDesc.replaceAll("CATEGORY","server"),
194 DEFAULT_SERVER_MAX_SUCCESS_KB));
195
196 addElementToDefinition(new SimpleType(ATTR_SERVER_MAX_FETCH_RESPONSES,
197 maxFetchResponsesDesc.replaceAll("CATEGORY","server"),
198 DEFAULT_SERVER_MAX_FETCH_RESPONSES));
199 addElementToDefinition(new SimpleType(ATTR_SERVER_MAX_ALL_KB,
200 maxAllKbDesc.replaceAll("CATEGORY","server"),
201 DEFAULT_SERVER_MAX_ALL_KB));
202
203 addElementToDefinition(new SimpleType(ATTR_HOST_MAX_FETCH_SUCCESSES,
204 maxFetchSuccessesDesc.replaceAll("CATEGORY","host"),
205 DEFAULT_HOST_MAX_FETCH_SUCCESSES));
206 addElementToDefinition(new SimpleType(ATTR_HOST_MAX_SUCCESS_KB,
207 maxSuccessKbDesc.replaceAll("CATEGORY","host"),
208 DEFAULT_HOST_MAX_SUCCESS_KB));
209
210 addElementToDefinition(new SimpleType(ATTR_HOST_MAX_FETCH_RESPONSES,
211 maxFetchResponsesDesc.replaceAll("CATEGORY","host"),
212 DEFAULT_HOST_MAX_FETCH_RESPONSES));
213 addElementToDefinition(new SimpleType(ATTR_HOST_MAX_ALL_KB,
214 maxAllKbDesc.replaceAll("CATEGORY","host"),
215 DEFAULT_HOST_MAX_ALL_KB));
216
217 addElementToDefinition(new SimpleType(ATTR_GROUP_MAX_FETCH_SUCCESSES,
218 maxFetchSuccessesDesc.replaceAll("CATEGORY","group (queue)"),
219 DEFAULT_GROUP_MAX_FETCH_SUCCESSES));
220 addElementToDefinition(new SimpleType(ATTR_GROUP_MAX_SUCCESS_KB,
221 maxSuccessKbDesc.replaceAll("CATEGORY","group (queue)"),
222 DEFAULT_GROUP_MAX_SUCCESS_KB));
223
224 addElementToDefinition(new SimpleType(ATTR_GROUP_MAX_FETCH_RESPONSES,
225 maxFetchResponsesDesc.replaceAll("CATEGORY","group (queue)"),
226 DEFAULT_GROUP_MAX_FETCH_RESPONSES));
227 addElementToDefinition(new SimpleType(ATTR_GROUP_MAX_ALL_KB,
228 maxAllKbDesc.replaceAll("CATEGORY","group (queue)"),
229 DEFAULT_GROUP_MAX_ALL_KB));
230
231 }
232
233 protected void innerProcess(CrawlURI curi) {
234 CrawlSubstats.HasCrawlSubstats[] haveStats =
235 new CrawlSubstats.HasCrawlSubstats[] {
236 getController().getServerCache().getServerFor(curi),
237 getController().getServerCache().getHostFor(curi),
238 getController().getFrontier().getGroup(curi)
239 };
240
241 for(int cat = SERVER; cat <= GROUP; cat++) {
242 if (checkQuotas(curi, haveStats[cat], cat)) {
243 return;
244 }
245 }
246 }
247
248 /***
249 * Check all quotas for the given substats and category (server, host, or
250 * group).
251 *
252 * @param curi CrawlURI to mark up with results
253 * @param hasStats holds CrawlSubstats with actual values to test
254 * @param CAT category index (SERVER, HOST, GROUP) to quota settings keys
255 * @return true if quota precludes fetching of CrawlURI
256 */
257 protected boolean checkQuotas(final CrawlURI curi,
258 final CrawlSubstats.HasCrawlSubstats hasStats,
259 final int CAT) {
260 if (hasStats == null) {
261 if (LOGGER.isLoggable(Level.FINE)) {
262 LOGGER.fine(curi.toString() + " null stats category: " + CAT);
263 }
264 return false;
265 }
266 CrawlSubstats substats = hasStats.getSubstats();
267 long[] actuals = new long[] {
268 -1,
269 substats.getFetchSuccesses(),
270 substats.getSuccessBytes()/1024,
271 substats.getFetchResponses(),
272 substats.getTotalBytes()/1024,
273 };
274 for(int q = SUCCESSES; q <= RESPONSE_KB; q++) {
275 if(applyQuota(curi, keys[CAT][q], actuals[q])) {
276 return true;
277 }
278 }
279 return false;
280 }
281
282 /***
283 * Apply the quota specified by the given key against the actual
284 * value provided. If the quota and actual values rule out processing the
285 * given CrawlURI, mark up the CrawlURI appropriately.
286 *
287 * @param curi CrawlURI whose processing is subject to a potential quota
288 * limitation
289 * @param quotaKey settings key to get applicable quota
290 * @param actual current value to compare to quota
291 * @return true is CrawlURI is blocked by a quota, false otherwise
292 */
293 protected boolean applyQuota(CrawlURI curi, String quotaKey, long actual) {
294 long quota = ((Long)getUncheckedAttribute(curi, quotaKey)).longValue();
295 if (quota >= 0 && actual >= quota) {
296 curi.setFetchStatus(S_BLOCKED_BY_QUOTA);
297 curi.addAnnotation("Q:"+quotaKey);
298 curi.skipToProcessorChain(getController().getPostprocessorChain());
299 if((Boolean)getUncheckedAttribute(curi,ATTR_FORCE_RETIRE)) {
300 curi.putObject(CoreAttributeConstants.A_FORCE_RETIRE, (Boolean) true);
301 }
302 return true;
303 }
304 return false;
305 }
306 }