打算新开一坑,搞个支持动态SQL和对象映射的异步数据访问库。说到动态SQL和对象映射这就是Mybatis的强项,可是它背后的JDBC的标准是典型的同步阻塞IO。那数据库的异步驱动有吗?当然,postgresql-async支持PostgreSQL和MySQL,但是它的API偏原始,没有ORM特性。操作数据库事务这个问题总是最烦人的,尽量不要用但你又不能不支持,今天来看看mybatis-spring是如何实现的。
1 2 3 4 5 6 7 8
| @Transactional(value = "transactionManager") public void updateUserInfo(User u) { userMapper.updateUserinfo(u); bookMapper.updateUserInfo(u); bookApplyMapper.updateUserInfo(u); }
|
上面的代码简单的展示了Spring里声明式事务的使用,可以看到Spring把事务的使用做的及其简单,一些基础的配置配好后,一个@Transactional
解决问题。那问题来了它怎么实现的?我们知道Spring里事务的实现是AOP应用的一个典型,在没有看源码前我用大白话说一下我对实现这个功能自己的理解。事务的关键在于connection
的commit
和rollback
,在一个事务中多个SQL得共用一个connection
吧,怎么做呢?诶,动态代理。在上面的方法执行前去获取一个connection
,这个方法里的所有SQL操作共用这一个。怎么实现共用呢?如果没有事务,方法里的每一个SQL都会经历拿连接、执行、释放等操作;如果有事务就只要获取共用的连接、执行,不需要自己释放;可以看到两种方式下逻辑不同。怎么做呢?关于这两个问题朴素的想法是:
- 事务时我的连接存在一个地方大家都能拿到,这样就可以共用。
- 每条SQL执行时得知道自己是不是在一个大的事务中,这样才能实现不同的逻辑,才能确定我是去拿那个共用的还是自己创建一个用完释放掉。
那么关键就是共享,static?考虑一下两个并发请求执行同一事务,static这种全局的就不适用了吧,这种情况创建了两个connection
,但我不知道该用哪一个。所以实现事务动态代理是一个关键,线程绑定是另一个关键。把你的数据库连接和是否是一个大事务这个状态放ThreadLocal
里,这样可以做贡献还能处理线程间独立。
原生Mybatis代码
Mybatis和Spring整合前先看看原生Mybatis的SQL执行流程。
1 2 3 4 5 6 7
| String resource = "mybatis-config.xml" InputStream inputStream = Resources.getResourceAsStream(resource) SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream) SqlSession session = sqlSessionFactory.openSession(true) UserMapper mapper = session.getMapper(UserMapper.class) mapper.get() session.close()
|
先得理解SqlSession,它是执行一次SQL的会话,它也封装了事务操作的commit、rollback等操作。session.getMapper(UserMapper.class)
返回的是代理对象,代理对象由MapperProxyFactory
创建,MapperProxy
定义了真正的执行逻辑,最终SQL的执行又回到sqlSession里。sqlSession里有个executor成员变量,往往它是SimpleExecutor类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public int doUpdate(MappedStatement ms, Object parameter) throws SQLException { Statement stmt = null; try { Configuration configuration = ms.getConfiguration(); StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null); stmt = prepareStatement(handler, ms.getStatementLog()); return handler.update(stmt); } finally { closeStatement(stmt); } } private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException { Statement stmt; Connection connection = getConnection(statementLog); stmt = handler.prepare(connection, transaction.getTimeout()); handler.parameterize(stmt); return stmt; } protected Connection getConnection(Log statementLog) throws SQLException { Connection connection = transaction.getConnection(); if (statementLog.isDebugEnabled()) { return ConnectionLogger.newInstance(connection, statementLog, queryStack); } else { return connection; } }
|
Mybatis基本的代码很容易看懂,这里不多说。
Mybatis-Spring
Spring整合后的事务处理代码是从Spring里AOP的TransactionInterceptor拦截器开始的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public Object invoke(final MethodInvocation invocation) throws Throwable { Class targetClass = invocation.getThis() != null?AopUtils.getTargetClass(invocation.getThis()):null; return this.invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() { public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } }); } protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final TransactionAspectSupport.InvocationCallback invocation) throws Throwable { TransactionAspectSupport.TransactionInfo ex = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { retVal = invocation.proceedWithInvocation(); } catch (Throwable var15) { this.completeTransactionAfterThrowing(ex, var15); throw var15; } finally { this.cleanupTransactionInfo(ex); } this.commitTransactionAfterReturning(ex); return retVal;
}
|
到这里我们看到了代理的一个基本情况,我们进入到createTransactionIfNecessary
方法里最终来到AbstractPlatformTransactionManager
的getTransaction
方法
1 2 3 4 5 6 7
| public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { boolean err = this.getTransactionSynchronization() != 2; DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, err, debugEnabled, newSynchronization); this.doBegin(transaction, (TransactionDefinition)definition); this.prepareSynchronization(status, (TransactionDefinition)definition); return status; }
|
this.doBegin
会来到DataSourceTransactionManager
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction; Connection con = null;
try { if(txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection ex = this.dataSource.getConnection(); if(this.logger.isDebugEnabled()) { this.logger.debug("Acquired Connection [" + ex + "] for JDBC transaction"); }
txObject.setConnectionHolder(new ConnectionHolder(ex), true); }
txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); Integer ex1 = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(ex1); if(con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if(this.logger.isDebugEnabled()) { this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); }
con.setAutoCommit(false); }
txObject.getConnectionHolder().setTransactionActive(true); int timeout = this.determineTimeout(definition); if(timeout != -1) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); }
if(txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(this.getDataSource(), txObject.getConnectionHolder()); }
} catch (Throwable var7) { if(txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder((ConnectionHolder)null, false); }
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7); } }
|
TransactionSynchronizationManager
类很关键,线程本地变量存的连接和事务状态都在这里
1 2 3 4 5 6 7 8 9 10
| public abstract class TransactionSynchronizationManager { private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class); private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources"); private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal("Transaction synchronizations"); private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal("Current transaction name"); private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal("Current transaction read-only status"); private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal("Current transaction isolation level"); private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal("Actual transaction active"); }
|
以上还都是Spring对于事务的处理,那么来看看Mybatis都做了什么。原生mybatis的sqlsession是defaultsqlsession类型;spring整合后sqlsession是sqlsessiontemplate类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| public class SqlSessionTemplate implements SqlSession, DisposableBean { private final SqlSessionFactory sqlSessionFactory; private final ExecutorType executorType; private final SqlSession sqlSessionProxy; private final PersistenceExceptionTranslator exceptionTranslator; public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {
notNull(sqlSessionFactory, "Property 'sqlSessionFactory' is required"); notNull(executorType, "Property 'executorType' is required");
this.sqlSessionFactory = sqlSessionFactory; this.executorType = executorType; this.exceptionTranslator = exceptionTranslator; this.sqlSessionProxy = (SqlSession) newProxyInstance( SqlSessionFactory.class.getClassLoader(), new Class[] { SqlSession.class }, new SqlSessionInterceptor()); } private class SqlSessionInterceptor implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { SqlSession sqlSession = getSqlSession( SqlSessionTemplate.this.sqlSessionFactory, SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator); try { Object result = method.invoke(sqlSession, args); if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) { sqlSession.commit(true); x } return result; } catch (Throwable t) { Throwable unwrapped = unwrapThrowable(t); if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) { closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory); sqlSession = null; Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped); if (translated != null) { unwrapped = translated; } } throw unwrapped; } finally { if (sqlSession != null) { closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory); } } } } }
|
看看SpringManagedTransaction
1 2 3 4 5 6
| private void openConnection() throws SQLException { this.connection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.connection.getAutoCommit(); this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource); }
|
到这里整合实现了共享一个connection,可以看到mybatis整合了Spring自后,sqlsession里的Transaction改变了使得支持spring事务。
mapper注入
spring 扫描的Mapper 就是接口类型的Bean,mybatis 会把这个bean替换成一个factoryBean(工厂bean),getObject()方法得到的实例就是这个Bean的实例,当然这个实例一定是代理对象
private void processBeanDefinitions(Set<BeanDefinitionHolder> beanDefinitions) {
GenericBeanDefinition definition;
for (BeanDefinitionHolder holder : beanDefinitions) {
definition = (GenericBeanDefinition) holder.getBeanDefinition();
String beanClassName = definition.getBeanClassName();
LOGGER.debug(() -> "Creating MapperFactoryBean with name '" + holder.getBeanName()
+ "' and '" + beanClassName + "' mapperInterface");
//传入原本的接口定义的那个Mapper,mapperFactoryBean这个类的构造方法就是传入一个class
definition.getConstructorArgumentValues().addGenericArgumentValue(beanClassName);
//然后把bean的类型转成mapperFactoryBean,它的getObject()会产生代理对象
definition.setBeanClass(this.mapperFactoryBean.getClass());
definition.getPropertyValues().add("addToConfig", this.addToConfig);
//后面代码略
}
}