1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 package org.archive.queue;
26
27 import java.io.Serializable;
28 import java.util.AbstractQueue;
29 import java.util.Iterator;
30 import java.util.concurrent.atomic.AtomicLong;
31 import java.util.logging.Logger;
32
33 import com.sleepycat.bind.EntryBinding;
34 import com.sleepycat.bind.serial.SerialBinding;
35 import com.sleepycat.bind.serial.StoredClassCatalog;
36 import com.sleepycat.bind.tuple.TupleBinding;
37 import com.sleepycat.collections.StoredSortedMap;
38 import com.sleepycat.je.Database;
39 import com.sleepycat.je.DatabaseConfig;
40 import com.sleepycat.je.DatabaseException;
41
42 /***
43 * Queue backed by a JE Collections StoredSortedMap.
44 *
45 * @author gojomo
46 *
47 * @param <E>
48 */
49 public class StoredQueue<E extends Serializable> extends AbstractQueue<E> implements Serializable {
50 private static final long serialVersionUID = 1L;
51 private static final Logger logger =
52 Logger.getLogger(StoredQueue.class.getName());
53
54 transient StoredSortedMap queueMap;
55 transient Database queueDb;
56 AtomicLong tailIndex;
57 AtomicLong headIndex;
58
59 /***
60 * Create a StoredQueue backed by the given Database.
61 *
62 * The Class of values to be queued may be provided; there is only a
63 * benefit when a primitive type is specified. A StoredClassCatalog
64 * must be provided if a primitive type is not supplied.
65 *
66 * @param db
67 * @param clsOrNull
68 * @param classCatalog
69 */
70 public StoredQueue(Database db, Class clsOrNull, StoredClassCatalog classCatalog) {
71 tailIndex = new AtomicLong(0);
72 headIndex = new AtomicLong(0);
73 hookupDatabase(db, clsOrNull, classCatalog);
74 }
75
76 /***
77 * @param db
78 * @param clsOrNull
79 * @param classCatalog
80 */
81 public void hookupDatabase(Database db, Class clsOrNull, StoredClassCatalog classCatalog) {
82 EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(clsOrNull);
83 if(valueBinding == null) {
84 valueBinding = new SerialBinding(classCatalog, clsOrNull);
85 }
86 queueDb = db;
87 queueMap = new StoredSortedMap(
88 db,
89 TupleBinding.getPrimitiveBinding(Long.class),
90 valueBinding,
91 true);
92 }
93
94 @SuppressWarnings("unchecked")
95 @Override
96 public Iterator<E> iterator() {
97 return queueMap.values().iterator();
98 }
99
100 @Override
101 public int size() {
102 synchronized(tailIndex) {
103 synchronized(headIndex) {
104 return (int)(tailIndex.get()-headIndex.get());
105 }
106 }
107
108 }
109
110 public boolean offer(E o) {
111 synchronized (tailIndex) {
112 queueMap.put(tailIndex.getAndIncrement(), o);
113 }
114 return true;
115 }
116
117 @SuppressWarnings("unchecked")
118 public E peek() {
119 synchronized (headIndex) {
120 E head = null;
121 while(head == null && headIndex.get() < tailIndex.get()) {
122 head = (E) queueMap.get(headIndex.get());
123 if(head != null) {
124 return head;
125 }
126
127 logger.severe("unexpected empty index of StoredQueue: "
128 + headIndex.get() + " (tailIndex: "
129 + tailIndex.get());
130 headIndex.incrementAndGet();
131 }
132 return head;
133 }
134 }
135
136 @SuppressWarnings("unchecked")
137 public E poll() {
138 synchronized (headIndex) {
139 E head = peek();
140 if(head!=null) {
141 return (E) queueMap.remove(headIndex.getAndIncrement());
142 } else {
143 return null;
144 }
145 }
146 }
147
148 /***
149 * A suitable DatabaseConfig for the Database backing a StoredQueue.
150 * (However, it is not necessary to use these config options.)
151 *
152 * @return DatabaseConfig suitable for queue
153 */
154 public static DatabaseConfig databaseConfig() {
155 DatabaseConfig dbConfig = new DatabaseConfig();
156 dbConfig.setTransactional(false);
157 dbConfig.setAllowCreate(true);
158 dbConfig.setDeferredWrite(true);
159 return dbConfig;
160 }
161
162 /***
163 * Save the state to a stream (that is, serialize it).
164 *
165 * @serialData The capacity is emitted (int), followed by all of
166 * its elements (each an <tt>Object</tt>) in the proper order,
167 * followed by a null
168 * @param s the stream
169 */
170 private void writeObject(java.io.ObjectOutputStream s)
171 throws java.io.IOException {
172 try {
173 queueDb.sync();
174 } catch (DatabaseException e) {
175 throw new RuntimeException(e);
176 }
177 s.defaultWriteObject();
178 }
179
180 public void close() {
181 try {
182 queueDb.sync();
183 queueDb.close();
184 } catch (DatabaseException e) {
185 throw new RuntimeException(e);
186 }
187 }
188 }