开源OLAP引擎(基于sparkhivethriftserver源码修改)对外提供jdbc接口流程解读

tech2022-11-27  107

入口demo

public static void main(String[] args) throws ClassNotFoundException, SQLException { Class.forName("org.apache.hive.jdbc.HiveDriver"); // Connection conn = DriverManager.getConnection("jdbc:hive2://hostname:10007","admin","12345+"); Connection conn = DriverManager.getConnection("jdbc:zk://hostname:2181/;tenant=admin", "carm", "12345+"); PreparedStatement pstat = conn.prepareStatement("select * from tableGroupA where InfoIndex rlike 'abcxyz\\\\' limit 100"); // PreparedStatement pstat = conn.prepareStatement("select * from default.testschema071902 limit 10"); pstat.setQueryTimeout(666); pstat.execute(); ResultSet rs = pstat.executeQuery(); while (rs.next()) { String name = rs.getString("rowKey"); System.out.println(name); } }

跟一般数据库的jdbc使用一样,一开始通过Class.forname加载加载数据库驱动包,这个这个地方有不明的地方,可以参考https://blog.csdn.net/qq_29924227/article/details/81042958这个

然后就是通过DriverManager.getConnection(原理可以参https://blog.csdn.net/yumenshizhongjingjie/article/details/81036357,写的行详细),传入jdbc的连接和用户名密码。

接下来就进入org.apache.hive.jdbc.HiveDriver#connect

public Connection connect(String url, Properties info) throws SQLException { if (acceptsURL(url)) return new HiveConnection(url, info); return null; } acceptsURL(url)中对传入进来的url做了个正则表达是匹配,只支持zk和hive2的两种方式进行连接,匹配则继续,new一个HiveConnection

HiveConnection中初始化了一个client,分两种情况,分别是

if (isEmbeddedMode) { EmbeddedThriftBinaryCLIService embeddedClient = new EmbeddedThriftBinaryCLIService(); embeddedClient.init(new HiveConf()); client = embeddedClient; } else { // extract user/password from JDBC connection properties if its not supplied in the // connection URL if (info.containsKey(JdbcConnectionParams.AUTH_USER)) { sessConfMap.put(JdbcConnectionParams.AUTH_USER, info.getProperty(JdbcConnectionParams.AUTH_USER)); if (info.containsKey(JdbcConnectionParams.AUTH_PASSWD)) { sessConfMap.put(JdbcConnectionParams.AUTH_PASSWD, info.getProperty(JdbcConnectionParams.AUTH_PASSWD)); } } if (info.containsKey(JdbcConnectionParams.AUTH_TYPE)) { sessConfMap.put(JdbcConnectionParams.AUTH_TYPE, info.getProperty(JdbcConnectionParams.AUTH_TYPE)); } // open the client transport openTransport(); // set up the client client = new TCLIService.Client(new TBinaryProtocol(transport)); }

然后通过client创建了一个session

TOpenSessionResp openResp = client.OpenSession(openReq);后面详细的代码就先不看了

我们回到一开始的代码,

PreparedStatement pstat = conn.prepareStatement("select   * from tableGroupA where  InfoIndex    rlike  'abcxyz\\\\'   limit   100");

这里调用了HiveConnection.prepareStatement(sql)方法,其实最终调用的是下面的这个方法

public HivePreparedStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle, String sql) { super(connection, client, sessHandle); this.sql = sql; }

然后执行

pstat.execute();

这里最终执行的是

public boolean execute(String sql) throws SQLException { checkConnection("execute"); closeClientOperation(); initFlags(); TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql); /** * Run asynchronously whenever possible * Currently only a SQLOperation can be run asynchronously, * in a background operation thread * Compilation is synchronous and execution is asynchronous */ execReq.setRunAsync(true); sessConf.put(Utils.QUERY_TIME_OUT,String.valueOf(timeoutInSecond)); execReq.setConfOverlay(sessConf); transportLock.lock(); try { //这里调用的是org.apache.hive.service.cli.thrift.ThriftCLIService#ExecuteStatement这个方法 TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); stmtHandle = execResp.getOperationHandle(); isExecuteStatementFailed = false; } catch (SQLException eS) { isExecuteStatementFailed = true; throw eS; } catch (Exception ex) { isExecuteStatementFailed = true; throw new SQLException(ex.toString(), "08S01", ex); } finally { transportLock.unlock(); } TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); boolean operationComplete = false; TGetOperationStatusResp statusResp; // Poll on the operation status, till the operation is complete while (!operationComplete) { try { /** * For an async SQLOperation, GetOperationStatus will use the long polling approach * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires */ transportLock.lock(); try { statusResp = client.GetOperationStatus(statusReq); } finally { transportLock.unlock(); } Utils.verifySuccessWithInfo(statusResp.getStatus()); if (statusResp.isSetOperationState()) { switch (statusResp.getOperationState()) { case CLOSED_STATE: case FINISHED_STATE: operationComplete = true; break; case CANCELED_STATE: // 01000 -> warning throw new SQLException("Query was cancelled", "01000"); case ERROR_STATE: // Get the error details from the underlying exception throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(), statusResp.getErrorCode()); case UKNOWN_STATE: throw new SQLException("Unknown query", "HY000"); case INITIALIZED_STATE: case PENDING_STATE: case RUNNING_STATE: break; } } } catch (SQLException e) { isLogBeingGenerated = false; throw e; } catch (Exception e) { isLogBeingGenerated = false; throw new SQLException(e.toString(), "08S01", e); } } isLogBeingGenerated = false; // The query should be completed by now if (!stmtHandle.isHasResultSet()) { return false; } resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) .setScrollable(isScrollableResultset).setTransportLock(transportLock) .build(); return true; }

然后进入方法,org.apache.hive.service.cli.thrift.ThriftCLIService#ExecuteStatement

public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException { TExecuteStatementResp resp = new TExecuteStatementResp(); try { SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); String statement = req.getStatement(); Map<String, String> confOverlay = req.getConfOverlay(); Boolean runAsync = req.isRunAsync(); OperationHandle operationHandle = runAsync ? cliService.executeStatementAsync(sessionHandle, statement, confOverlay) : cliService.executeStatement(sessionHandle, statement, confOverlay); resp.setOperationHandle(operationHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { LOG.warn("Error executing statement: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); } return resp; }

至此,后续的逻辑已经可以和我上一篇的博客https://blog.csdn.net/riluomati/article/details/108349547能串起来了

代码流程看的比较粗枝大叶,其实想看下整个流程的走向,单纯的只是做个记录,如果有错误的地方请轻喷哈,也深深体会到,代码流程这玩意,用语言流畅的表述出来,是个很困难的事情啊,先到这吧……

最新回复(0)