Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* External JDBC Catalog resource for external table query.
Expand Down Expand Up @@ -163,6 +165,8 @@ public class JdbcResource extends Resource {

// timeout for both connection and read. 10 seconds is long enough.
private static final int HTTP_TIMEOUT_MS = 10000;
private static final Pattern OCEANBASE_DRIVER_VERSION_PATTERN =
Pattern.compile("oceanbase-client-(\\d+(?:\\.\\d+){1,3})\\.jar", Pattern.CASE_INSENSITIVE);
@SerializedName(value = "configs")
private Map<String, String> configs;

Expand All @@ -185,7 +189,7 @@ public void modifyProperties(Map<String, String> properties) throws DdlException
for (String propertyKey : ALL_PROPERTIES) {
replaceIfEffectiveValue(this.configs, propertyKey, properties.get(propertyKey));
}
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL)));
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL), getProperty(DRIVER_URL)));
super.modifyProperties(properties);
}

Expand Down Expand Up @@ -216,7 +220,7 @@ protected void setProperties(ImmutableMap<String, String> properties) throws Ddl
throw new DdlException("JdbcResource Missing " + property + " in properties");
}
}
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL)));
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL), getProperty(DRIVER_URL)));
configs.put(CHECK_SUM, computeObjectChecksum(getProperty(DRIVER_URL)));
}

Expand Down Expand Up @@ -396,6 +400,10 @@ public static String parseDbType(String url) throws DdlException {
}

public static String handleJdbcUrl(String jdbcUrl) throws DdlException {
return handleJdbcUrl(jdbcUrl, null);
}

public static String handleJdbcUrl(String jdbcUrl, String driverUrl) throws DdlException {
// delete all space in jdbcUrl
String newJdbcUrl = jdbcUrl.replaceAll(" ", "");
String dbType = parseDbType(newJdbcUrl);
Expand All @@ -415,6 +423,10 @@ public static String handleJdbcUrl(String jdbcUrl) throws DdlException {
if (dbType.equals(OCEANBASE)) {
// set useCursorFetch to true
newJdbcUrl = checkAndSetJdbcBoolParam(dbType, newJdbcUrl, "useCursorFetch", "false", "true");
if (shouldDisableOceanBaseLegacyDatetimeCode(driverUrl)) {
newJdbcUrl = checkAndSetJdbcBoolParam(dbType, newJdbcUrl,
"useLegacyDatetimeCode", "true", "false");
}
}
}
if (dbType.equals(POSTGRESQL)) {
Expand All @@ -429,6 +441,31 @@ public static String handleJdbcUrl(String jdbcUrl) throws DdlException {
return newJdbcUrl;
}

private static boolean shouldDisableOceanBaseLegacyDatetimeCode(String driverUrl) {
if (driverUrl == null || driverUrl.isEmpty()) {
return false;
}
Matcher matcher = OCEANBASE_DRIVER_VERSION_PATTERN.matcher(driverUrl);
if (!matcher.find()) {
return false;
}
return compareVersion(matcher.group(1), "2.4.15") >= 0;
}

private static int compareVersion(String leftVersion, String rightVersion) {
String[] left = leftVersion.split("\\.");
String[] right = rightVersion.split("\\.");
int maxLength = Math.max(left.length, right.length);
for (int i = 0; i < maxLength; i++) {
int leftPart = i < left.length ? Integer.parseInt(left[i]) : 0;
int rightPart = i < right.length ? Integer.parseInt(right[i]) : 0;
if (leftPart != rightPart) {
return Integer.compare(leftPart, rightPart);
}
}
return 0;
}

/**
* Check jdbcUrl param, if the param is not set, set it to the expected value.
* If the param is set to an unexpected value, replace it with the expected value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ protected Map<String, String> processCompatibleProperties(Map<String, String> pr
}
String jdbcUrl = properties.getOrDefault(JdbcResource.JDBC_URL, "");
if (!Strings.isNullOrEmpty(jdbcUrl)) {
jdbcUrl = JdbcResource.handleJdbcUrl(jdbcUrl);
jdbcUrl = JdbcResource.handleJdbcUrl(jdbcUrl, properties.get(JdbcResource.DRIVER_URL));
properties.put(JdbcResource.JDBC_URL, jdbcUrl);
}
return properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.doris.datasource.jdbc.client;

import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
Expand All @@ -35,7 +37,9 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -168,12 +172,18 @@ public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName, String remo
rs = getRemoteColumns(databaseMetaData, catalogName, remoteDbName, remoteTableName);

Map<String, String> mapFieldtoType = Maps.newHashMap();
if (isDoris) {
if (requiresFullTypeDefinition()) {
mapFieldtoType = getColumnsDataTypeUseQuery(remoteDbName, remoteTableName);
}

while (rs.next()) {
JdbcFieldSchema field = new JdbcFieldSchema(rs, mapFieldtoType);
JdbcFieldSchema field = new JdbcFieldSchema(rs);
String fullTypeName = mapFieldtoType.get(field.getColumnName());
if (isDoris) {
field.setDataTypeName(Optional.ofNullable(fullTypeName));
} else {
field.setFullDataTypeName(Optional.ofNullable(fullTypeName));
}
tableSchema.add(field);
}
} catch (SQLException e) {
Expand Down Expand Up @@ -225,27 +235,31 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
if (isDoris) {
return dorisTypeToDoris(fieldSchema);
}
// For mysql type: "INT UNSIGNED":
// fieldSchema.getDataTypeName().orElse("unknown").split(" ")[0] == "INT"
// fieldSchema.getDataTypeName().orElse("unknown").split(" ")[1] == "UNSIGNED"
String[] typeFields = fieldSchema.getDataTypeName().orElse("unknown").split(" ");
String mysqlType = typeFields[0];
return mysqlTypeToDoris(fieldSchema, enableMappingVarbinary, enableMappingTimestampTz, convertDateToNull);
}

@VisibleForTesting
static Type mysqlTypeToDoris(JdbcFieldSchema fieldSchema, boolean enableMappingVarbinary,
boolean enableMappingTimestampTz, boolean convertDateToNull) {
MySqlTypeDescriptor typeDescriptor = MySqlTypeDescriptor.from(fieldSchema);
String mysqlType = typeDescriptor.baseType;
// For unsigned int, should extend the type.
if (typeFields.length > 1 && "UNSIGNED".equals(typeFields[1])) {
if (typeDescriptor.unsigned) {
switch (mysqlType) {
case "TINYINT":
return Type.SMALLINT;
case "SMALLINT":
case "MEDIUMINT":
return Type.INT;
case "INT":
case "INTEGER":
return Type.BIGINT;
case "BIGINT":
return Type.LARGEINT;
case "DECIMAL": {
int precision = fieldSchema.requiredColumnSize() + 1;
int scale = fieldSchema.requiredDecimalDigits();
return createDecimalOrStringType(precision, scale);
int precision = getColumnLength(fieldSchema, typeDescriptor) + 1;
int scale = getDecimalScale(fieldSchema, typeDescriptor);
return createMysqlDecimalOrStringType(precision, scale);
}
case "DOUBLE":
// As of MySQL 8.0.17, the UNSIGNED attribute is deprecated
Expand All @@ -260,15 +274,20 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
}
}
switch (mysqlType) {
case "BOOL":
case "BOOLEAN":
return Type.BOOLEAN;
case "TINYINT":
if (fieldSchema.getDataType() == Types.BIT && getOptionalColumnLength(fieldSchema, typeDescriptor) == 1) {
return Type.BOOLEAN;
}
return Type.TINYINT;
case "SMALLINT":
case "YEAR":
return Type.SMALLINT;
case "MEDIUMINT":
case "INT":
case "INTEGER":
return Type.INT;
case "BIGINT":
return Type.BIGINT;
Expand All @@ -278,11 +297,7 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
}
return ScalarType.createDateV2Type();
case "TIMESTAMP": {
int columnSize = fieldSchema.requiredColumnSize();
int scale = columnSize > 19 ? columnSize - 20 : 0;
if (scale > 6) {
scale = 6;
}
int scale = getDateTimeScale(fieldSchema, typeDescriptor);
if (convertDateToNull) {
fieldSchema.setAllowNull(true);
}
Expand All @@ -291,12 +306,8 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
}
case "DATETIME": {
// mysql can support microsecond
// use columnSize to calculate the precision of timestamp/datetime
int columnSize = fieldSchema.requiredColumnSize();
int scale = columnSize > 19 ? columnSize - 20 : 0;
if (scale > 6) {
scale = 6;
}
// use type definition when available, otherwise fall back to column size metadata
int scale = getDateTimeScale(fieldSchema, typeDescriptor);
if (convertDateToNull) {
fieldSchema.setAllowNull(true);
}
Expand All @@ -307,24 +318,24 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
case "DOUBLE":
return Type.DOUBLE;
case "DECIMAL": {
int precision = fieldSchema.requiredColumnSize();
int scale = fieldSchema.requiredDecimalDigits();
return createDecimalOrStringType(precision, scale);
int precision = getColumnLength(fieldSchema, typeDescriptor);
int scale = getDecimalScale(fieldSchema, typeDescriptor);
return createMysqlDecimalOrStringType(precision, scale);
}
case "CHAR":
return ScalarType.createCharType(fieldSchema.requiredColumnSize());
return ScalarType.createCharType(getColumnLength(fieldSchema, typeDescriptor));
case "VARCHAR":
return ScalarType.createVarcharType(fieldSchema.requiredColumnSize());
return ScalarType.createVarcharType(getColumnLength(fieldSchema, typeDescriptor));
case "TINYBLOB":
case "BLOB":
case "MEDIUMBLOB":
case "LONGBLOB":
case "BINARY":
case "VARBINARY":
return enableMappingVarbinary ? ScalarType.createVarbinaryType(fieldSchema.requiredColumnSize())
return enableMappingVarbinary ? ScalarType.createVarbinaryType(getColumnLength(fieldSchema, typeDescriptor))
: ScalarType.createStringType();
case "BIT":
if (fieldSchema.requiredColumnSize() == 1) {
if (getOptionalColumnLength(fieldSchema, typeDescriptor) == 1) {
return Type.BOOLEAN;
} else {
return ScalarType.createStringType();
Expand All @@ -344,6 +355,10 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
}
}

private boolean requiresFullTypeDefinition() {
return isDoris || JdbcResource.OCEANBASE.equals(dbType);
}

private boolean isConvertDatetimeToNull(JdbcClientConfig jdbcClientConfig) {
// Check if the JDBC URL contains "zeroDateTimeBehavior=convertToNull" or "zeroDateTimeBehavior=convert_to_null"
String jdbcUrl = jdbcClientConfig.getJdbcUrl().toLowerCase();
Expand Down Expand Up @@ -462,4 +477,89 @@ private Type dorisTypeToDoris(JdbcFieldSchema fieldSchema) {
return Type.UNSUPPORTED;
}
}

private static int getColumnLength(JdbcFieldSchema fieldSchema, MySqlTypeDescriptor typeDescriptor) {
return typeDescriptor.length.orElseGet(fieldSchema::requiredColumnSize);
}

private static int getOptionalColumnLength(JdbcFieldSchema fieldSchema, MySqlTypeDescriptor typeDescriptor) {
return typeDescriptor.length.orElseGet(() -> fieldSchema.getColumnSize().orElse(0));
}

private static int getDecimalScale(JdbcFieldSchema fieldSchema, MySqlTypeDescriptor typeDescriptor) {
return typeDescriptor.scale.orElseGet(fieldSchema::requiredDecimalDigits);
}

private static int getDateTimeScale(JdbcFieldSchema fieldSchema, MySqlTypeDescriptor typeDescriptor) {
int scale = typeDescriptor.length.orElseGet(() -> {
int columnSize = fieldSchema.getColumnSize().orElse(0);
return columnSize > 19 ? columnSize - 20 : 0;
});
return Math.min(scale, 6);
}

private static Type createMysqlDecimalOrStringType(int precision, int scale) {
if (precision <= ScalarType.MAX_DECIMAL128_PRECISION && precision > 0) {
return ScalarType.createDecimalV3Type(precision, scale);
}
return ScalarType.createStringType();
}

private static class MySqlTypeDescriptor {
private final String baseType;
private final boolean unsigned;
private final Optional<Integer> length;
private final Optional<Integer> scale;

private MySqlTypeDescriptor(String baseType, boolean unsigned, Optional<Integer> length,
Optional<Integer> scale) {
this.baseType = baseType;
this.unsigned = unsigned;
this.length = length;
this.scale = scale;
}

private static MySqlTypeDescriptor from(JdbcFieldSchema fieldSchema) {
String typeName = fieldSchema.getFullDataTypeName()
.orElse(fieldSchema.getDataTypeName().orElse("unknown"));
String normalized = typeName == null ? "" : typeName.trim().replaceAll("\\s+", " ");
if (normalized.isEmpty()) {
return new MySqlTypeDescriptor("UNKNOWN", false, Optional.empty(), Optional.empty());
}

String upperType = normalized.toUpperCase(Locale.ROOT);
boolean unsigned = upperType.contains(" UNSIGNED");
int openParen = upperType.indexOf('(');
int firstSpace = upperType.indexOf(' ');
int endIndex = upperType.length();
if (openParen >= 0) {
endIndex = openParen;
} else if (firstSpace >= 0) {
endIndex = firstSpace;
}
String baseType = upperType.substring(0, endIndex);

Optional<Integer> length = Optional.empty();
Optional<Integer> scale = Optional.empty();
if (openParen >= 0) {
int closeParen = upperType.indexOf(')', openParen + 1);
if (closeParen > openParen + 1) {
String[] parameters = upperType.substring(openParen + 1, closeParen).split(",");
length = parseTypeParameter(parameters[0]);
if (parameters.length > 1) {
scale = parseTypeParameter(parameters[1]);
}
}
}
return new MySqlTypeDescriptor(baseType, unsigned, length, scale);
}

private static Optional<Integer> parseTypeParameter(String parameter) {
try {
return Optional.of(Integer.parseInt(parameter.trim()));
} catch (NumberFormatException e) {
return Optional.empty();
}
}
}
}
Loading
Loading