Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,12 @@ public PreparedStatement prepareStatement(
final int resultSetHoldability)
throws SQLException {
checkOpen();
return getMeta()
.createPreparedStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
return ArrowFlightPreparedStatement.builder(this)
.withQuery(sql)
.withGeneratedHandle()
.withResultSetType(resultSetType)
.withResultSetConcurrency(resultSetConcurrency)
.withResultSetHoldability(resultSetHoldability)
.build();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public final class ArrowFlightJdbcFlightStreamResultSet
throws SQLException {
super(statement, state, signature, resultSetMetaData, timeZone, firstFrame);
this.connection = (ArrowFlightConnection) statement.connection;
this.flightInfo = ((ArrowFlightInfoStatement) statement).executeFlightInfoQuery();
this.flightInfo = ((ArrowFlightMetaStatement) statement).executeFlightInfoQuery();
}

/** Private constructor for fromFlightInfo. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ public ArrowFlightMetaImpl(final AvaticaConnection connection) {

@Override
public void closeStatement(final StatementHandle statementHandle) {
AvaticaStatement statement = connection.statementMap.get(statementHandle.id);
if (statement instanceof ArrowFlightPreparedStatement) {
((ArrowFlightPreparedStatement) statement).closePreparedResources();
}
getMetaStatement(statementHandle).closeStatement();
}

@Override
Expand All @@ -64,8 +61,7 @@ public ExecuteResult execute(
final StatementHandle statementHandle,
final List<TypedValue> typedValues,
final long maxRowCount) {
return getPreparedStatementInstance(statementHandle)
.executeWithTypedValues(statementHandle, typedValues, maxRowCount);
return getMetaStatement(statementHandle).execute(statementHandle, typedValues, maxRowCount);
}

@Override
Expand All @@ -80,8 +76,7 @@ public ExecuteResult execute(
public ExecuteBatchResult executeBatch(
final StatementHandle statementHandle, final List<List<TypedValue>> parameterValuesList)
throws IllegalStateException {
return getPreparedStatementInstance(statementHandle)
.executeBatchWithTypedValues(statementHandle, parameterValuesList);
return getMetaStatement(statementHandle).executeBatch(statementHandle, parameterValuesList);
}

@Override
Expand All @@ -96,31 +91,16 @@ public Frame fetch(
String.format("%s does not use frames.", this), AvaticaConnection.HELPER.unsupported());
}

ArrowFlightPreparedStatement createPreparedStatement(
final String query,
final int resultSetType,
final int resultSetConcurrency,
final int resultSetHoldability)
throws SQLException {
return ArrowFlightPreparedStatement.builder((ArrowFlightConnection) connection)
.withQuery(query)
.withGeneratedHandle()
.withResultSetType(resultSetType)
.withResultSetConcurrency(resultSetConcurrency)
.withResultSetHoldability(resultSetHoldability)
.build();
}

@Override
public StatementHandle prepare(
final ConnectionHandle connectionHandle, final String query, final long maxRowCount) {
try {
return createPreparedStatement(
query,
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY,
connection.getHoldability())
.handle;
// This is the Avatica entry point used by Connection.prepareStatement(String).
ArrowFlightPreparedStatement stmt =
(ArrowFlightPreparedStatement)
connection.prepareStatement(
query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
return stmt.handle;
} catch (SQLException e) {
throw new RuntimeException(e);
}
Expand All @@ -133,6 +113,7 @@ public ExecuteResult prepareAndExecute(
final long maxRowCount,
final PrepareCallback prepareCallback)
throws NoSuchStatementException {
// This is the Avatica entry point used by Statement.execute(String).
return prepareAndExecute(
statementHandle, query, maxRowCount, -1 /* Not used */, prepareCallback);
}
Expand All @@ -146,20 +127,9 @@ public ExecuteResult prepareAndExecute(
final PrepareCallback callback)
throws NoSuchStatementException {
try {
final AvaticaStatement statement = connection.statementMap.get(handle.id);
if (!(statement instanceof ArrowFlightStatement)
&& !(statement instanceof ArrowFlightPreparedStatement)) {
throw new IllegalStateException("Prepared statement not found: " + handle);
}
if (statement instanceof ArrowFlightPreparedStatement) {
((ArrowFlightPreparedStatement) statement).closePreparedResources();
}
final ArrowFlightPreparedStatement preparedStatement =
ArrowFlightPreparedStatement.builder((ArrowFlightConnection) connection)
.withQuery(query)
.withExistingStatement(statement)
.build();
return preparedStatement.prepareAndExecute(callback);
// This is the Avatica entry point used by Statement.execute(String).
return getMetaStatement(handle)
.prepareAndExecute(query, maxRowCount, maxRowsInFirstFrame, callback);
} catch (SQLTimeoutException e) {
// So far AvaticaStatement(executeInternal) only handles NoSuchStatement and
// Runtime
Expand Down Expand Up @@ -216,30 +186,37 @@ void setDefaultConnectionProperties() {
.setTransactionIsolation(Connection.TRANSACTION_NONE);
}

private ArrowFlightPreparedStatement getPreparedStatementInstance(
StatementHandle statementHandle) {
private ArrowFlightMetaStatement getMetaStatement(StatementHandle statementHandle) {
AvaticaStatement statement = connection.statementMap.get(statementHandle.id);
if (!(statement instanceof ArrowFlightPreparedStatement)) {
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
if (statement instanceof ArrowFlightMetaStatement) {
return (ArrowFlightMetaStatement) statement;
}
return (ArrowFlightPreparedStatement) statement;
throw new IllegalStateException("Statement not found: " + statementHandle);
}

ArrowFlightPreparedStatement getPreparedStatementInstanceOrNull(StatementHandle statementHandle) {
AvaticaStatement statement = connection.statementMap.get(statementHandle.id);
if (statement instanceof ArrowFlightPreparedStatement) {
return (ArrowFlightPreparedStatement) statement;
}
return null;
public static Signature buildDefaultSignature() {
return buildSignature(null, StatementType.SELECT);
}

public static Signature buildDefaultSignature() {
return buildSignature(null, null, null);
public static Signature buildSignature(final String sql, final StatementType type) {
return buildSignature(sql, null, null, type);
}

/** Builds an Avatica signature from Arrow result and parameter schemas. */
public static Signature buildSignature(
final String sql, final Schema resultSetSchema, final Schema parameterSchema) {
StatementType statementType =
resultSetSchema == null || resultSetSchema.getFields().isEmpty()
? StatementType.IS_DML
: StatementType.SELECT;
return buildSignature(sql, resultSetSchema, parameterSchema, statementType);
}

private static Signature buildSignature(
final String sql,
final Schema resultSetSchema,
final Schema parameterSchema,
final StatementType statementType) {
List<ColumnMetaData> columnMetaData =
resultSetSchema == null
? new ArrayList<>()
Expand All @@ -248,10 +225,6 @@ public static Signature buildSignature(
parameterSchema == null
? new ArrayList<>()
: ConvertUtils.convertArrowFieldsToAvaticaParameters(parameterSchema.getFields());
StatementType statementType =
resultSetSchema == null || resultSetSchema.getFields().isEmpty()
? StatementType.IS_DML
: StatementType.SELECT;
return new Signature(
columnMetaData,
sql,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.driver.jdbc;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import org.apache.arrow.flight.FlightInfo;
import org.apache.calcite.avatica.Meta.ExecuteBatchResult;
import org.apache.calcite.avatica.Meta.ExecuteResult;
import org.apache.calcite.avatica.Meta.PrepareCallback;
import org.apache.calcite.avatica.Meta.StatementHandle;
import org.apache.calcite.avatica.remote.TypedValue;

/** Statement capabilities used by {@link ArrowFlightMetaImpl}. */
interface ArrowFlightMetaStatement extends Statement {

@Override
ArrowFlightConnection getConnection() throws SQLException;

FlightInfo executeFlightInfoQuery() throws SQLException;

/**
* Avatica routes {@link Statement#execute(String)} through Meta.prepareAndExecute(...), so plain
* statements still need this hook even when they support direct executeQuery/executeUpdate paths.
*/
ExecuteResult prepareAndExecute(
String query, long maxRowCount, int maxRowsInFirstFrame, PrepareCallback callback)
throws SQLException;

default ExecuteResult execute(
final StatementHandle statementHandle,
final List<TypedValue> typedValues,
final long maxRowCount) {
throw new IllegalStateException(
"Statement operation is not supported for handle: " + statementHandle);
}

default ExecuteBatchResult executeBatch(
final StatementHandle statementHandle, final List<List<TypedValue>> parameterValuesList) {
throw new IllegalStateException(
"Statement operation is not supported for handle: " + statementHandle);
}

default void closeStatement() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

/** Arrow Flight JDBC's implementation {@link java.sql.PreparedStatement}. */
public class ArrowFlightPreparedStatement extends AvaticaPreparedStatement
implements ArrowFlightInfoStatement {
implements ArrowFlightMetaStatement {

private ArrowFlightSqlClientHandler.PreparedStatement preparedStatement;

Expand Down Expand Up @@ -80,6 +80,21 @@ ExecuteResult prepareAndExecute(final PrepareCallback callback) throws SQLExcept
return new ExecuteResult(Collections.singletonList(metaResultSet));
}

@Override
public ExecuteResult prepareAndExecute(
final String query,
final long maxRowCount,
final int maxRowsInFirstFrame,
final PrepareCallback callback)
throws SQLException {

return ArrowFlightPreparedStatement.builder(getConnection())
.withQuery(query)
.withExistingStatement(this)
.build()
.prepareAndExecute(callback);
}

Schema getDataSetSchema() {
ensurePrepared();
return preparedStatement.getDataSetSchema();
Expand Down Expand Up @@ -143,6 +158,25 @@ ExecuteBatchResult executeBatchWithTypedValues(
return new ExecuteBatchResult(updatedCounts);
}

@Override
public ExecuteResult execute(
final StatementHandle statementHandle,
final List<TypedValue> typedValues,
final long maxRowCount) {
return executeWithTypedValues(statementHandle, typedValues, maxRowCount);
}

@Override
public ExecuteBatchResult executeBatch(
final StatementHandle statementHandle, final List<List<TypedValue>> parameterValuesList) {
return executeBatchWithTypedValues(statementHandle, parameterValuesList);
}

@Override
public void closeStatement() {
closePreparedResources();
}

@Override
public FlightInfo executeFlightInfoQuery() throws SQLException {
ensurePrepared();
Expand Down
Loading