View Javadoc

1   /* StoredQueue.java
2    *
3    * $Id: BloomFilter32bitSplit.java 5197 2007-06-06 01:31:46Z gojomo $
4    *
5    * Created on Jun 14, 2007
6    *
7    * Copyright (C) 2007 Internet Archive
8    *
9    * This file is part of the Heritrix web crawler (crawler.archive.org).
10   *
11   * Heritrix is free software; you can redistribute it and/or modify
12   * it under the terms of the GNU Lesser Public License as published by
13   * the Free Software Foundation; either version 2.1 of the License, or
14   * any later version.
15   *
16   * Heritrix is distributed in the hope that it will be useful,
17   * but WITHOUT ANY WARRANTY; without even the implied warranty of
18   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19   * GNU Lesser Public License for more details.
20   *
21   * You should have received a copy of the GNU Lesser Public License
22   * along with Heritrix; if not, write to the Free Software
23   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
24   */
25  package org.archive.queue;
26  
27  import java.io.Serializable;
28  import java.util.AbstractQueue;
29  import java.util.Iterator;
30  import java.util.concurrent.atomic.AtomicLong;
31  import java.util.logging.Logger;
32  
33  import com.sleepycat.bind.EntryBinding;
34  import com.sleepycat.bind.serial.SerialBinding;
35  import com.sleepycat.bind.serial.StoredClassCatalog;
36  import com.sleepycat.bind.tuple.TupleBinding;
37  import com.sleepycat.collections.StoredSortedMap;
38  import com.sleepycat.je.Database;
39  import com.sleepycat.je.DatabaseConfig;
40  import com.sleepycat.je.DatabaseException;
41  
42  /***
43   * Queue backed by a JE Collections StoredSortedMap. 
44   * 
45   * @author gojomo
46   *
47   * @param <E>
48   */
49  public class StoredQueue<E extends Serializable> extends AbstractQueue<E>  implements Serializable {
50      private static final long serialVersionUID = 1L;
51      private static final Logger logger =
52          Logger.getLogger(StoredQueue.class.getName());
53  
54      transient StoredSortedMap queueMap; // Long -> E
55      transient Database queueDb; // Database
56      AtomicLong tailIndex; // next spot for insert
57      AtomicLong headIndex; // next spot for read
58   
59      /***
60       * Create a StoredQueue backed by the given Database. 
61       * 
62       * The Class of values to be queued may be provided; there is only a 
63       * benefit when a primitive type is specified. A StoredClassCatalog
64       * must be provided if a primitive type is not supplied. 
65       * 
66       * @param db
67       * @param clsOrNull 
68       * @param classCatalog
69       */
70      public StoredQueue(Database db, Class clsOrNull, StoredClassCatalog classCatalog) {
71          tailIndex = new AtomicLong(0);
72          headIndex = new AtomicLong(0);
73          hookupDatabase(db, clsOrNull, classCatalog);
74      }
75  
76      /***
77       * @param db
78       * @param clsOrNull
79       * @param classCatalog
80       */
81      public void hookupDatabase(Database db, Class clsOrNull, StoredClassCatalog classCatalog) {
82          EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(clsOrNull);
83          if(valueBinding == null) {
84              valueBinding = new SerialBinding(classCatalog, clsOrNull);
85          }
86          queueDb = db;
87          queueMap = new StoredSortedMap(
88                  db,
89                  TupleBinding.getPrimitiveBinding(Long.class),
90                  valueBinding,
91                  true);
92      }
93  
94      @SuppressWarnings("unchecked")
95      @Override
96      public Iterator<E> iterator() {
97          return queueMap.values().iterator();
98      }
99  
100     @Override
101     public int size() {
102         synchronized(tailIndex) {
103             synchronized(headIndex) {
104                 return (int)(tailIndex.get()-headIndex.get());
105             }
106         }
107         
108     }
109 
110     public boolean offer(E o) {
111         synchronized (tailIndex) {
112             queueMap.put(tailIndex.getAndIncrement(), o);
113         }
114         return true;
115     }
116 
117     @SuppressWarnings("unchecked")
118     public E peek() {
119         synchronized (headIndex) {
120             E head = null;
121             while(head == null && headIndex.get() < tailIndex.get()) {
122                 head = (E) queueMap.get(headIndex.get());
123                 if(head != null) {
124                     return head;
125                 }
126                 // ERROR; should never be null with headIndex < tailIndex
127                 logger.severe("unexpected empty index of StoredQueue: "
128                         + headIndex.get() + " (tailIndex: " 
129                         + tailIndex.get());
130                 headIndex.incrementAndGet();
131             }
132             return head;
133         }
134     }
135 
136     @SuppressWarnings("unchecked")
137     public E poll() {
138         synchronized (headIndex) {
139             E head = peek();
140             if(head!=null) {
141                 return (E) queueMap.remove(headIndex.getAndIncrement());
142             } else {
143                 return null;
144             }
145         }
146     }
147 
148     /***
149      * A suitable DatabaseConfig for the Database backing a StoredQueue. 
150      * (However, it is not necessary to use these config options.)
151      * 
152      * @return DatabaseConfig suitable for queue
153      */
154     public static DatabaseConfig databaseConfig() {
155         DatabaseConfig dbConfig = new DatabaseConfig();
156         dbConfig.setTransactional(false);
157         dbConfig.setAllowCreate(true);
158         dbConfig.setDeferredWrite(true);
159         return dbConfig;
160     }
161     
162     /***
163      * Save the state to a stream (that is, serialize it).
164      *
165      * @serialData The capacity is emitted (int), followed by all of
166      * its elements (each an <tt>Object</tt>) in the proper order,
167      * followed by a null
168      * @param s the stream
169      */
170     private void writeObject(java.io.ObjectOutputStream s)
171         throws java.io.IOException {
172         try {
173             queueDb.sync();
174         } catch (DatabaseException e) {
175             throw new RuntimeException(e); 
176         } 
177         s.defaultWriteObject();
178     }
179 
180     public void close() {
181         try {
182             queueDb.sync();
183             queueDb.close();
184         } catch (DatabaseException e) {
185             throw new RuntimeException(e);
186         }
187     }
188 }