ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

sharding-jdbc处理流程源码分析

2021-05-09 02:01:55  阅读:201  来源: 互联网

标签:jdbc route 分片 源码 result sql sharding 路由


目录

前言

sharding-jdbc主要功能是分片,我们实现不同分片算法来进行分库分表,另外一个扩展点就是主键生成, 本文主要记录下sharding-jdbc执行流程和分片路由具体实现以及主键生成,在工作中方便排查问题。

主要记录三个问题:

1.sharding-jdbc执行流程

2.自定义分片算法是如何被sharding-jdbc框架调用的

3.主键是在何处何时生成

4.扩展机制spi

1. sharding-jdbc处理流程

操作数据库套路是:数据源获取数据库连接,数据库连接生成Statement,然后执行Statement,获取sql执行结果。

那么对于sharding来说

入口获取数据库连接就是ShardingDataSource.getConnection()

接着生成PreparedStatement:ShardingConnection.prepareStatement(String),生成ShardingPreparedStatement

对于增删改查就是ShardingPreparedStatement的execute()、executeUpdate()、executeQuery()、、

execute()为例:

//org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement.execute()
@Override
public boolean execute() throws SQLException {
    try {
        clearPrevious();//本地缓存清空
        shard();//路由,路由结果保存到this.routeResult。核心功能
        initPreparedStatementExecutor();//初始化执行器
        return preparedStatementExecutor.execute();//真实sql执行jdbc操作
    } finally {
        clearBatch();
    }
}

分析核心路由功能shard()

//org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement.shard()
private void shard() {
        routeResult = shardingEngine.shard(sql, getParameters());
    }

//org.apache.shardingsphere.core.BaseShardingEngine.shard(String, List<Object>)
public SQLRouteResult shard(final String sql, final List<Object> parameters) {
    List<Object> clonedParameters = cloneParameters(parameters);
    SQLRouteResult result = route(sql, clonedParameters);//路由核心实现
    result.getRouteUnits().addAll(HintManager.isDatabaseShardingOnly() ? convert(sql, clonedParameters, result) : rewriteAndConvert(sql, clonedParameters, result));//非hint,重写sql
    if (shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW)) {
        boolean showSimple = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SIMPLE);
        SQLLogger.logSQL(sql, showSimple, result.getSqlStatement(), result.getRouteUnits());//打印真实sql
    }
    return result;
}

//org.apache.shardingsphere.core.PreparedQueryShardingEngine.route(String, List<Object>)
@Override
protected SQLRouteResult route(final String sql, final List<Object> parameters) {
    return routingEngine.route(parameters);
}
//org.apache.shardingsphere.core.route.PreparedStatementRoutingEngine.route(List<Object>)
public SQLRouteResult route(final List<Object> parameters) {
    if (null == sqlStatement) {
        sqlStatement = shardingRouter.parse(logicSQL, true);//代码@1
    }
    return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));//代码@2
}

代码@1

//org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.parse(String, boolean) 
//解析sql
@Override
public SQLStatement parse(final String logicSQL, final boolean useCache) {
    parsingHook.start(logicSQL);//sharding-jdbc为开发预留的钩子,我们可以实现钩子接口在解析sql前后做一些扩展
    try {
        SQLStatement result = new SQLParsingEngine(databaseType, logicSQL, shardingRule, shardingMetaData.getTable(), parsingResultCache).parse(useCache);//代码@1.1,解析sql的核心
        parsingHook.finishSuccess(result, shardingMetaData.getTable());
        return result;
        // CHECKSTYLE:OFF
    } catch (final Exception ex) {
        // CHECKSTYLE:ON
        parsingHook.finishFailure(ex);
        throw ex;
    }
}

代码@1处解析sql比较复杂,只需要知道是解析sql,解析结果SQLStatement,这个是也不是我们的关注点,知道有个hook接口可以在sql解析前后进行扩展即可,比如通过该Hook可以用作计算sql执行时长。

知道增删改查对对应的SQLStatement如下:

对于insert来说SQLStatement是InsertStatement。DML

对于update delete语句来说SQLStatement是DMLStatement。DML

对于select语句来说SQLStatement是SelectStatement。 DQL

SQLStatement是个逻辑sql。

类关系图如下:

image-20210420232603003

代码@2

masterSlaveRouter是读写分离路由,不使用的情况下,可以忽略。

分片的路由核心实现在shardingRouter.route(logicSQL, parameters, sqlStatement),下面分析这个

//org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.route(String, List<Object>, SQLStatement)
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
        Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement
                ? GeneratedKey.getGenerateKey(shardingRule, parameters, (InsertStatement) sqlStatement) : Optional.<GeneratedKey>absent();//代码@2.1
        SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey.orNull());//代码@2.2
        OptimizeResult optimizeResult = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey.orNull()).optimize();//代码@2.3
        if (generatedKey.isPresent()) {
            setGeneratedKeys(result, generatedKey.get());//代码@2.4
        }
        boolean needMerge = false;
        if (sqlStatement instanceof SelectStatement) {
            needMerge = isNeedMergeShardingValues((SelectStatement) sqlStatement);//代码@2.5
        }
        if (needMerge) {
            checkSubqueryShardingValues(sqlStatement, optimizeResult.getShardingConditions());
            mergeShardingValues(optimizeResult.getShardingConditions());//代码@2.6
        }
        RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), sqlStatement, optimizeResult).route();//代码@2.7
        if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit() && !routingResult.isSingleRouting()) {
            result.setLimit(getProcessedLimit(parameters, (SelectStatement) sqlStatement));//代码@2.8
        }
        if (needMerge) {
            Preconditions.checkState(1 == routingResult.getTableUnits().getTableUnits().size(), "Must have one sharding with subquery.");//代码@2.9
        }
        result.setRoutingResult(routingResult);//代码@2.10
        result.setOptimizeResult(optimizeResult);
        return result;//代码@2.11
    }

从上面路由核心代码可以看出,ShardingRouter是解析和路由的核心接口,其实现类为ParsingSQLRouter,它使用四个引擎对sql进行解析、解析和重写,这四个引擎为:

  • SQLParsingEngine
    解析sql,返回SQLStatement作为解析的结果。

  • OptimizeEngine
    对SQLStatement进行优化,返回ShardingConditions对象。

  • RoutingEngine
    根据库表分片配置以及ShardingConditions找到目标库表,返回RoutingResult对象。

  • SQLRewriteEngine
    根据路由结果重写sql。

代码@2.1处:如果是insert,则生成分布式主键,GeneratedKey封装了分片键和分布式主键值。比如,insert语句,这里使用Snowflake算法生成分布式主键。

代码@2.2处:创建sql路由结果对象SQLRouteResult,封装SQLStatement和分布式主键对象GeneratedKey。此时SQLRouteResult只是包含了sql语句和主键值,并没有生成实际待执行sql。

代码@2.3处:使用OptimizeEngine对SQLStatement进行优化,返回OptimizeResult对象。该对象在重写sql时候用到,作用就是对SQLStatement进行优化,返回ShardingConditions对象。

代码@2.4处:保存生成的分布式主键。

代码@2.5处:select语句是否需要合并结果

代码@2.6处:需要合并查询结果,则合并

代码@2.7处:使用不同的RoutingEngine生成路由结果RoutingResult。比如标准分片是StandardRoutingEngine、复合分片是ComplexRoutingEngine、广播是DatabaseBroadcastRoutingEngine、不分片是DefaultDatabaseRoutingEngine等。 这里是核心代码,总体功能就是路由,找到实际的数据源和真实表

image-20210414221409197

代码@2.8处:select语句设置limit。既然分库分表了,通常也就不使用分页了。

代码@2.9处:预检,需要合并结果,需要分片键在查询结果上。

代码@2.10处:把路由结果RoutingResult、优化结果OptimizeResult保存到SQLRouteResult。

代码@2.11处:返回sql路由结果对象SQLRouteResult,该对象封装了路由结果,知道要到哪个真实库去执行哪个真实表。

核心代码@2.7处分析

RoutingEngineFactory.newInstance()根据不同的分片规则采用对应的RoutingEngine生成路由结果RoutingResult,以标准分片路由为例

//org.apache.shardingsphere.core.route.type.standard.StandardRoutingEngine.route()
@Override
public RoutingResult route() {
    return generateRoutingResult(getDataNodes(shardingRule.getTableRule(logicTableName)));//1.getTableRule根据逻辑表获取TableRule,2.getDataNodes根据TableRule和分片算法获取真实的数据源和真实表Collection<DataNode>
}
//shardingRule.getTableRule(logicTableName)根据逻辑表从分片规则ShardingRule获取表规则TableRule,TableRule信息封装的较多,有逻辑表、全部数据源等

//org.apache.shardingsphere.core.route.type.standard.StandardRoutingEngine.getDataNodes(TableRule)
//获取真实节点,真实数据源和真实表。DataNode封装了真实数据源和真实表
private Collection<DataNode> getDataNodes(final TableRule tableRule) {
    if (shardingRule.isRoutingByHint(tableRule)) {//hint路由
        return routeByHint(tableRule);
    }
    if (isRoutingByShardingConditions(tableRule)) {//条件路由,即非hint路由
        return routeByShardingConditions(tableRule);
    }
    return routeByMixedConditions(tableRule);
}

private Collection<DataNode> routeByShardingConditions(final TableRule tableRule) {
    return optimizeResult.getShardingConditions().getShardingConditions().isEmpty() ? route(tableRule, Collections.<RouteValue>emptyList(), Collections.<RouteValue>emptyList())
        : routeByShardingConditionsWithCondition(tableRule);
}

private Collection<DataNode> route(final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues) {
    Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues);//获取真实数据源
    Collection<DataNode> result = new LinkedList<>();
    for (String each : routedDataSources) {
        result.addAll(routeTables(tableRule, each, tableShardingValues));//获取真实表
    }
    return result;
}

//根据分片键获取数据源
private Collection<String> routeDataSources(final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
    Collection<String> availableTargetDatabases = tableRule.getActualDatasourceNames();
    if (databaseShardingValues.isEmpty()) {
        return availableTargetDatabases;
    }
    Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(availableTargetDatabases, databaseShardingValues));//这里通过分片策略调用自定义的分片算法
    Preconditions.checkState(!result.isEmpty(), "no database route info");
    return result;
}

//根据分片键获取DataNode,即数据源+真实表
private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
    Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
    Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
                                                          : shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues));//这里通过分片策略调用自定义的分片算法
    Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
    Collection<DataNode> result = new LinkedList<>();
    for (String each : routedTables) {
        result.add(new DataNode(routedDataSource, each));
    }
    return result;
}

可以看到route()方法是入口,此方法首先通过ShardingRule获取到逻辑表所对应的TableRule对象,在sharding-jdbc启动阶段,TableRule保存了逻辑表对应的实际的库表关系集合,接着根据库和表的ShardingStrategy的类型走了三个不同的方法:routeByHint()、routeByShardingConditions()、routeByMixedConditions(),不管走哪个方法最终都会执行到含有三个参数的route()方法,此方法先调用routeDataSources()方法路由数据源(库),接着调用routeTables()方法路由表,路由库表的方法也很简单:

从TableRule中获取可用的库表集合。
从TableRule中获取库表的分片策略ShardingStrategy对象。
执行ShardingStrategy持有的分片算法ShardingAlgorithm的doSharding()方法返回路由到的库表。
路由的结果以RoutingResult的形式返回,接着调用SQLRewriteEngine重写sql,因为此时sql中的表还只是逻辑表名,并不是具体的哪个表,接着生成SQLUnit,并最终以SQLRouteResult形式返回路由结果。

重点是个SQLRouteResult,关系较复杂,类图封装关系如下

image-20210421004127246

使用xmind画出的处理流程

sharding-jdbc处理流程

思维导图地址:https://gitee.com/yulewo123/mdpicture/blob/master/document/sharding-jdbc%E6%89%A7%E8%A1%8C%E6%B5%81%E7%A8%8B.xmind

总结:

sharding-jdbc的处理流程核心就是路由,即根据分片键以及算法从从TableRule.actualDataNodes获取真实库表对象DataNode。那么TableRule是怎么来的呢?是ShardingRule根据逻辑表获取,而ShardingRule是核心,在sharding-jdbc启动时候就创建完成。

路由获取后,就可以重写sql,然后通过jdbc执行sql到真实的数据源执行真实sql。

关键debug点记录如下,工作中遇到问题,方便快速回顾debug定为问题

org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.route(String, List, SQLStatement) 路由核心

2.sharding-jdbc的扩展点

sharding-jdbc设计采用jdk的spi进行扩展,所有扩展注册都会调用org.apache.shardingsphere.core.spi.NewInstanceServiceLoader.register(Class<T>),因此跟踪这个方法调用如下

image-20210424234705670

前面三个是在启动过程进行注册,后面五个是在首次运行过程中进行注册。

列举出sharding的spi扩展点

启动过程

org.apache.shardingsphere.spi.masterslave.MasterSlaveLoadBalanceAlgorithm 读写分离扩展,可以扩展使用什么算法来选择对应的数据源。

org.apache.shardingsphere.spi.encrypt.ShardingEncryptor 加密方式扩展,扩展加密方式

interface org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator 分布式主键生成,可以通过实行这个接口增加分布式主键生成算法

首次运行

interface org.apache.shardingsphere.core.execute.hook.RootInvokeHook 根调用钩子,具体就是在创建shardingConnection时候就启用,关闭时候完成,可以用于统计一个sharding从开始执行到执行结束耗时,可用于监控

interface org.apache.shardingsphere.core.parse.hook.ParsingHook sql解析钩子,比如可以用于统计sql解析耗时等

interface org.apache.shardingsphere.core.parse.spi.ShardingParseEngine 解析引擎,sharding-jdbc已经针对每个数据库类型增加了对应的解析引擎

interface org.apache.shardingsphere.core.rewrite.hook.RewriteHook 重写sql钩子

interface org.apache.shardingsphere.core.execute.hook.SQLExecutionHook sql执行钩子,可以计算sql执行时长,用于监控

以上这些spi接口,断点打在这些spi接口的子类hook上即可,观测到调用。

比如添加一个自定义打印sql执行耗时:

public class CustomShardingSQLExecutionHook implements SQLExecutionHook {

	@Override
	public void start(RouteUnit routeUnit, DataSourceMetaData dataSourceMetaData, boolean isTrunkThread,
			Map<String, Object> shardingExecuteDataMap) {
		// TODO Auto-generated method stub
		System.err.println("start");//控制台红色打印
	}

	@Override
	public void finishSuccess() {
		// TODO Auto-generated method stub
		System.err.println("finishSuccess");

	}

	@Override
	public void finishFailure(Exception cause) {
		// TODO Auto-generated method stub
		System.err.println("finishFailure");
	}
}
//同时在resources下创建META-INF/services/org.apache.shardingsphere.core.execute.hook.SQLExecutionHook,内容如下
//com.zyj.sharding.hook.CustomShardingSQLExecutionHook
//这样自定义的sql执行钩子就生效了,就是jdk的spi写法。

参考 https://www.jianshu.com/p/4cb5b2b68f8e

标签:jdbc,route,分片,源码,result,sql,sharding,路由
来源: https://www.cnblogs.com/zhangyjblogs/p/14747057.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有