001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.geronimo.transaction.manager;
019
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.List;
024 import java.util.Map;
025
026 import javax.transaction.*;
027 import javax.transaction.xa.XAException;
028 import javax.transaction.xa.Xid;
029
030 import java.util.concurrent.ConcurrentHashMap;
031 import java.util.concurrent.CopyOnWriteArrayList;
032 import java.util.concurrent.atomic.AtomicLong;
033 import org.apache.commons.logging.Log;
034 import org.apache.commons.logging.LogFactory;
035 import org.apache.geronimo.transaction.log.UnrecoverableLog;
036
037 /**
038 * Simple implementation of a transaction manager.
039 *
040 * @version $Rev: 584554 $ $Date: 2007-10-14 17:19:58 +0200 (Sun, 14 Oct 2007) $
041 */
042 public class TransactionManagerImpl implements TransactionManager, UserTransaction, TransactionSynchronizationRegistry, XidImporter, MonitorableTransactionManager, RecoverableTransactionManager {
043 private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
044 protected static final int DEFAULT_TIMEOUT = 600;
045 protected static final byte[] DEFAULT_TM_ID = new byte[] {71,84,77,73,68};
046
047 final TransactionLog transactionLog;
048 final XidFactory xidFactory;
049 private final int defaultTransactionTimeoutMilliseconds;
050 private final ThreadLocal transactionTimeoutMilliseconds = new ThreadLocal();
051 private final ThreadLocal threadTx = new ThreadLocal();
052 private final ConcurrentHashMap associatedTransactions = new ConcurrentHashMap();
053 private static final Log recoveryLog = LogFactory.getLog("RecoveryController");
054 final Recovery recovery;
055 private final CopyOnWriteArrayList transactionAssociationListeners = new CopyOnWriteArrayList();
056 private List recoveryErrors = new ArrayList();
057 // statistics
058 private AtomicLong totalCommits = new AtomicLong(0);
059 private AtomicLong totalRollBacks = new AtomicLong(0);
060 private AtomicLong activeCount = new AtomicLong(0);
061
062 public TransactionManagerImpl() throws XAException {
063 this(DEFAULT_TIMEOUT,
064 null,
065 null
066 );
067 }
068
069 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds) throws XAException {
070 this(defaultTransactionTimeoutSeconds,
071 null,
072 null
073 );
074 }
075
076 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException {
077 this(defaultTransactionTimeoutSeconds,
078 null,
079 transactionLog
080 );
081 }
082
083 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog) throws XAException {
084 if (defaultTransactionTimeoutSeconds <= 0) {
085 throw new IllegalArgumentException("defaultTransactionTimeoutSeconds must be positive: attempted value: " + defaultTransactionTimeoutSeconds);
086 }
087 this.defaultTransactionTimeoutMilliseconds = defaultTransactionTimeoutSeconds * 1000;
088
089 if (transactionLog == null) {
090 this.transactionLog = new UnrecoverableLog();
091 } else {
092 this.transactionLog = transactionLog;
093 }
094
095 if (xidFactory != null) {
096 this.xidFactory = xidFactory;
097 } else {
098 this.xidFactory = new XidFactoryImpl(DEFAULT_TM_ID);
099 }
100
101 recovery = new RecoveryImpl(this.transactionLog, this.xidFactory);
102 recovery.recoverLog();
103 }
104
105 public Transaction getTransaction() {
106 return (Transaction) threadTx.get();
107 }
108
109 private void associate(TransactionImpl tx) throws InvalidTransactionException {
110 if (tx == null) throw new NullPointerException("tx is null");
111
112 Object existingAssociation = associatedTransactions.putIfAbsent(tx, Thread.currentThread());
113 if (existingAssociation != null) {
114 throw new InvalidTransactionException("Specified transaction is already associated with another thread");
115 }
116 threadTx.set(tx);
117 fireThreadAssociated(tx);
118 activeCount.getAndIncrement();
119 }
120
121 private void unassociate() {
122 Transaction tx = getTransaction();
123 if (tx != null) {
124 associatedTransactions.remove(tx);
125 threadTx.set(null);
126 fireThreadUnassociated(tx);
127 activeCount.getAndDecrement();
128 }
129 }
130
131 public void setTransactionTimeout(int seconds) throws SystemException {
132 if (seconds < 0) {
133 throw new SystemException("transaction timeout must be positive or 0 to reset to default");
134 }
135 if (seconds == 0) {
136 transactionTimeoutMilliseconds.set(null);
137 } else {
138 transactionTimeoutMilliseconds.set(new Long(seconds * 1000));
139 }
140 }
141
142 public int getStatus() throws SystemException {
143 Transaction tx = getTransaction();
144 return (tx != null) ? tx.getStatus() : Status.STATUS_NO_TRANSACTION;
145 }
146
147 public void begin() throws NotSupportedException, SystemException {
148 begin(getTransactionTimeoutMilliseconds(0L));
149 }
150
151 public Transaction begin(long transactionTimeoutMilliseconds) throws NotSupportedException, SystemException {
152 if (getStatus() != Status.STATUS_NO_TRANSACTION) {
153 throw new NotSupportedException("Nested Transactions are not supported");
154 }
155 TransactionImpl tx = new TransactionImpl(xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
156 // timeoutTimer.schedule(tx, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
157 try {
158 associate(tx);
159 } catch (InvalidTransactionException e) {
160 // should not be possible since we just created that transaction and no one has a reference yet
161 throw (SystemException)new SystemException("Internal error: associate threw an InvalidTransactionException for a newly created transaction").initCause(e);
162 }
163 // Todo: Verify if this is correct thing to do. Use default timeout for next transaction.
164 this.transactionTimeoutMilliseconds.set(null);
165 return tx;
166 }
167
168 public Transaction suspend() throws SystemException {
169 Transaction tx = getTransaction();
170 if (tx != null) {
171 unassociate();
172 }
173 return tx;
174 }
175
176 public void resume(Transaction tx) throws IllegalStateException, InvalidTransactionException, SystemException {
177 if (getTransaction() != null) {
178 throw new IllegalStateException("Thread already associated with another transaction");
179 }
180 if (!(tx instanceof TransactionImpl)) {
181 throw new InvalidTransactionException("Cannot resume foreign transaction: " + tx);
182 }
183 associate((TransactionImpl) tx);
184 }
185
186 public Object getResource(Object key) {
187 TransactionImpl tx = getActiveTransactionImpl();
188 return tx.getResource(key);
189 }
190
191 private TransactionImpl getActiveTransactionImpl() {
192 TransactionImpl tx = (TransactionImpl)threadTx.get();
193 if (tx == null) {
194 throw new IllegalStateException("No tx on thread");
195 }
196 if (tx.getStatus() != Status.STATUS_ACTIVE && tx.getStatus() != Status.STATUS_MARKED_ROLLBACK) {
197 throw new IllegalStateException("Transaction " + tx + " is not active");
198 }
199 return tx;
200 }
201
202 public boolean getRollbackOnly() {
203 TransactionImpl tx = getActiveTransactionImpl();
204 return tx.getRollbackOnly();
205 }
206
207 public Object getTransactionKey() {
208 TransactionImpl tx = getActiveTransactionImpl();
209 return tx.getTransactionKey();
210 }
211
212 public int getTransactionStatus() {
213 TransactionImpl tx = (TransactionImpl) getTransaction();
214 return tx == null? Status.STATUS_NO_TRANSACTION: tx.getTransactionStatus();
215 }
216
217 public void putResource(Object key, Object value) {
218 TransactionImpl tx = getActiveTransactionImpl();
219 tx.putResource(key, value);
220 }
221
222 /**
223 * jta 1.1 method so the jpa implementations can be told to flush their caches.
224 * @param synchronization
225 */
226 public void registerInterposedSynchronization(Synchronization synchronization) {
227 TransactionImpl tx = getActiveTransactionImpl();
228 tx.registerInterposedSynchronization(synchronization);
229 }
230
231 public void setRollbackOnly() throws IllegalStateException {
232 TransactionImpl tx = (TransactionImpl) threadTx.get();
233 if (tx == null) {
234 throw new IllegalStateException("No transaction associated with current thread");
235 }
236 tx.setRollbackOnly();
237 }
238
239 public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
240 Transaction tx = getTransaction();
241 if (tx == null) {
242 throw new IllegalStateException("No transaction associated with current thread");
243 }
244 try {
245 tx.commit();
246 } finally {
247 unassociate();
248 }
249 totalCommits.getAndIncrement();
250 }
251
252 public void rollback() throws IllegalStateException, SecurityException, SystemException {
253 Transaction tx = getTransaction();
254 if (tx == null) {
255 throw new IllegalStateException("No transaction associated with current thread");
256 }
257 try {
258 tx.rollback();
259 } finally {
260 unassociate();
261 }
262 totalRollBacks.getAndIncrement();
263 }
264
265 //XidImporter implementation
266 public Transaction importXid(Xid xid, long transactionTimeoutMilliseconds) throws XAException, SystemException {
267 if (transactionTimeoutMilliseconds < 0) {
268 throw new SystemException("transaction timeout must be positive or 0 to reset to default");
269 }
270 TransactionImpl tx = new TransactionImpl(xid, xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
271 return tx;
272 }
273
274 public void commit(Transaction tx, boolean onePhase) throws XAException {
275 if (onePhase) {
276 try {
277 tx.commit();
278 } catch (HeuristicMixedException e) {
279 throw (XAException) new XAException().initCause(e);
280 } catch (HeuristicRollbackException e) {
281 throw (XAException) new XAException().initCause(e);
282 } catch (RollbackException e) {
283 throw (XAException) new XAException().initCause(e);
284 } catch (SecurityException e) {
285 throw (XAException) new XAException().initCause(e);
286 } catch (SystemException e) {
287 throw (XAException) new XAException().initCause(e);
288 }
289 } else {
290 try {
291 ((TransactionImpl) tx).preparedCommit();
292 } catch (SystemException e) {
293 throw (XAException) new XAException().initCause(e);
294 }
295 }
296 totalCommits.getAndIncrement();
297 }
298
299 public void forget(Transaction tx) throws XAException {
300 //TODO implement this!
301 }
302
303 public int prepare(Transaction tx) throws XAException {
304 try {
305 return ((TransactionImpl) tx).prepare();
306 } catch (SystemException e) {
307 throw (XAException) new XAException().initCause(e);
308 } catch (RollbackException e) {
309 throw (XAException) new XAException().initCause(e);
310 }
311 }
312
313 public void rollback(Transaction tx) throws XAException {
314 try {
315 tx.rollback();
316 } catch (IllegalStateException e) {
317 throw (XAException) new XAException().initCause(e);
318 } catch (SystemException e) {
319 throw (XAException) new XAException().initCause(e);
320 }
321 totalRollBacks.getAndIncrement();
322 }
323
324 long getTransactionTimeoutMilliseconds(long transactionTimeoutMilliseconds) {
325 if (transactionTimeoutMilliseconds != 0) {
326 return transactionTimeoutMilliseconds;
327 }
328 Long timeout = (Long) this.transactionTimeoutMilliseconds.get();
329 if (timeout != null) {
330 return timeout.longValue();
331 }
332 return defaultTransactionTimeoutMilliseconds;
333 }
334
335 //Recovery
336 public void recoveryError(Exception e) {
337 recoveryLog.error(e);
338 recoveryErrors.add(e);
339 }
340
341 public void recoverResourceManager(NamedXAResource xaResource) {
342 try {
343 recovery.recoverResourceManager(xaResource);
344 } catch (XAException e) {
345 recoveryError(e);
346 }
347 }
348
349 public Map getExternalXids() {
350 return new HashMap(recovery.getExternalXids());
351 }
352
353 public void addTransactionAssociationListener(TransactionManagerMonitor listener) {
354 transactionAssociationListeners.addIfAbsent(listener);
355 }
356
357 public void removeTransactionAssociationListener(TransactionManagerMonitor listener) {
358 transactionAssociationListeners.remove(listener);
359 }
360
361 protected void fireThreadAssociated(Transaction tx) {
362 for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
363 TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
364 try {
365 listener.threadAssociated(tx);
366 } catch (Exception e) {
367 log.warn("Error calling transaction association listener", e);
368 }
369 }
370 }
371
372 protected void fireThreadUnassociated(Transaction tx) {
373 for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
374 TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
375 try {
376 listener.threadUnassociated(tx);
377 } catch (Exception e) {
378 log.warn("Error calling transaction association listener", e);
379 }
380 }
381 }
382
383 /**
384 * Returns the number of active transactions.
385 */
386 public long getActiveCount() {
387 return activeCount.longValue();
388 }
389
390 /**
391 * Return the number of total commits
392 */
393 public long getTotalCommits() {
394 return totalCommits.longValue();
395 }
396
397 /**
398 * Returns the number of total rollbacks
399 */
400 public long getTotalRollbacks() {
401 return totalRollBacks.longValue();
402 }
403
404 /**
405 * Reset statistics
406 */
407 public void resetStatistics() {
408 totalCommits.getAndSet(0);
409 totalRollBacks.getAndSet(0);
410 }
411 }