diff --git a/CHANGELOG.md b/CHANGELOG.md index f836a5c89..194994c72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +## 0.9.9 + +### Breaking Changes + +- **[jdbc-v2]** Hardcoded server setting `async_insert=0` is removed as well as others. This is done to +fix issue with overriding these settings and using client with read-only profiles. The change, first of all, makes +driver behavior to follow default what is set on server side (note: starting ClickHouse 26.3 `async_insert` is on by default). +In second, this fix changes what number of affected rows returned by method like `java.sql.Statement.executeUpdate(java.lang.String)`. +Previously they return more accurate values because insert was synchronous, but in case of asynchronous insert it is not +guaranteed anymore (see also https://github.com/ClickHouse/ClickHouse/issues/57768). Read more about asynchronous insert https://clickhouse.com/docs/optimize/asynchronous-inserts. +(https://github.com/ClickHouse/clickhouse-java/issues/2652, https://github.com/ClickHouse/clickhouse-java/issues/2825) + ## 0.9.8 ### Improvements diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/ServerSettings.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/ServerSettings.java index 24210866f..84c1b4400 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/ServerSettings.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/ServerSettings.java @@ -46,4 +46,9 @@ public final class ServerSettings { public static final String ASYNC_INSERT = "async_insert"; public static final String WAIT_ASYNC_INSERT = "wait_for_async_insert"; + + // Misc + public static final String ON = "1"; + + public static final String OFF = "0"; } diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java index 7328048ce..73cd00d7e 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/ConnectionImpl.java @@ -110,9 +110,7 @@ public ConnectionImpl(String url, Properties info) throws SQLException { this.client.loadServerInfo(); } this.schema = client.getDefaultDatabase(); - this.defaultQuerySettings = new QuerySettings() - .serverSetting(ServerSettings.ASYNC_INSERT, "0") - .serverSetting(ServerSettings.WAIT_END_OF_QUERY, "0"); + this.defaultQuerySettings = new QuerySettings(); this.metadata = new DatabaseMetaDataImpl(this, false, url); this.defaultCalendar = Calendar.getInstance(); diff --git a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcConfiguration.java b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcConfiguration.java index 3680cc32b..d4d1c4987 100644 --- a/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcConfiguration.java +++ b/jdbc-v2/src/main/java/com/clickhouse/jdbc/internal/JdbcConfiguration.java @@ -74,7 +74,7 @@ public boolean isIgnoreUnsupportedRequests() { /** * Parses URL to get property and target host. - * Properties that are passed in the {@code info} parameter will override that are set in the {@code url}. + * Properties that are passed in the {@code url} will override the {@code info} ones. * @param url - JDBC url * @param info - Driver and Client properties. */ @@ -89,7 +89,7 @@ public JdbcConfiguration(String url, Properties info) throws SQLException { Map urlProperties = parseUrl(url); String tmpConnectionUrl = urlProperties.remove(PARSE_URL_CONN_URL_PROP); - initProperties(urlProperties, props); + buildFinalProperties(urlProperties, props); // after initializing all properties - set final connection URL boolean useSSLInfo = Boolean.parseBoolean(props.getProperty(DriverProperties.SECURE_CONNECTION.getKey(), "false")); @@ -266,7 +266,12 @@ private Map parseUrl(String url) throws SQLException { return properties; } - private void initProperties(Map urlProperties, Properties providedProperties) { + /** + * Combines url properties and provided ones via {@link java.sql.Driver#connect(String, Properties)} + * @param urlProperties - properties parsed from URL + * @param providedProperties - properties object provided by application + */ + private void buildFinalProperties(Map urlProperties, Properties providedProperties) { // Copy provided properties Map props = new HashMap<>(); diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/JdbcIntegrationTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/JdbcIntegrationTest.java index 69bc2a11e..fcf3f98f8 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/JdbcIntegrationTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/JdbcIntegrationTest.java @@ -4,6 +4,7 @@ import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.ClickHouseServerForTest; import com.clickhouse.client.api.ClientConfigProperties; +import com.clickhouse.client.api.internal.ServerSettings; import com.clickhouse.client.api.query.GenericRecord; import com.clickhouse.data.ClickHouseVersion; import com.clickhouse.logging.Logger; @@ -13,11 +14,16 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.List; +import java.util.Map; import java.util.Properties; public abstract class JdbcIntegrationTest extends BaseIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(JdbcIntegrationTest.class); + public static final String WAIT_ASYNC_SETTING_KEY = DriverProperties.serverSetting(ServerSettings.WAIT_ASYNC_INSERT); + public static final String WAIT_QUERY_SETTING_KEY = DriverProperties.serverSetting(ServerSettings.WAIT_END_OF_QUERY); + public static final String ASYNC_INSERT_SETTING_KEY = DriverProperties.serverSetting(ServerSettings.ASYNC_INSERT); + public String getEndpointString() { return getEndpointString(isCloud()); } @@ -28,7 +34,13 @@ public String getEndpointString(boolean includeDbName) { } public Connection getJdbcConnection() throws SQLException { - return getJdbcConnection(null); + return getJdbcConnection((Properties) null); + } + + public Connection getJdbcConnection(Map propertiesMap) throws SQLException { + Properties config = new Properties(); + config.putAll(propertiesMap); + return getJdbcConnection(config); } public Connection getJdbcConnection(Properties properties) throws SQLException { diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/PreparedStatementTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/PreparedStatementTest.java index d5bfb7978..6c76fe6f3 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/PreparedStatementTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/PreparedStatementTest.java @@ -1,6 +1,7 @@ package com.clickhouse.jdbc; import com.clickhouse.client.api.DataTypeUtils; +import com.clickhouse.client.api.internal.ServerSettings; import com.clickhouse.data.ClickHouseColumn; import com.clickhouse.data.ClickHouseDataType; import com.clickhouse.data.ClickHouseVersion; @@ -956,7 +957,7 @@ void testBatchInsertTextStatement(String sql) throws Exception { String table = "test_batch_text"; long seed = System.currentTimeMillis(); Random rnd = new Random(seed); - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { try (Statement stmt = conn.createStatement()) { stmt.execute("CREATE TABLE IF NOT EXISTS " + table + @@ -1007,7 +1008,7 @@ void testBatchInsertNoValuesReuse() throws Exception { String sql = "INSERT INTO %s (v1, v2) VALUES (?, ?)"; long seed = System.currentTimeMillis(); Random rnd = new Random(seed); - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { try (Statement stmt = conn.createStatement()) { stmt.execute("CREATE TABLE IF NOT EXISTS " + table + @@ -1062,7 +1063,7 @@ void testBatchInsertValuesReuse() throws Exception { String sql = "INSERT INTO %s (v1, v2) VALUES (1, ?)"; long seed = System.currentTimeMillis(); Random rnd = new Random(seed); - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { try (Statement stmt = conn.createStatement()) { stmt.execute("CREATE TABLE IF NOT EXISTS " + table + @@ -1266,7 +1267,7 @@ public void testJdbcEscapeSyntax() throws Exception { @Test(groups = {"integration "}) public void testStatementsWithDatabaseInTableIdentifier() throws Exception { - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { final String db1Name = conn.getSchema() + "_db1"; final String table1Name = "table1"; try (Statement stmt = conn.createStatement()) { @@ -1296,7 +1297,7 @@ public void testStatementsWithDatabaseInTableIdentifier() throws Exception { @Test(groups = {"integration "}) public void testNullValues() throws Exception { - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { final String table = "test_null_values"; try (Statement stmt = conn.createStatement()) { stmt.execute("DROP TABLE IF EXISTS " + table); diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java index 575dde388..5fe6abeb4 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import static org.testng.Assert.assertEquals; @@ -148,7 +149,7 @@ public void testExecuteQueryDates() throws Exception { @Test(groups = {"integration"}) public void testExecuteUpdateSimpleNumbers() throws Exception { - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { try (Statement stmt = conn.createStatement()) { assertEquals(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + getDatabase() + ".simpleNumbers (num UInt8) ENGINE = MergeTree ORDER BY ()"), 0); assertEquals(stmt.executeUpdate("INSERT INTO " + getDatabase() + ".simpleNumbers VALUES (1), (2), (3)"), 3); @@ -167,7 +168,7 @@ public void testExecuteUpdateSimpleNumbers() throws Exception { @Test(groups = {"integration"}) public void testExecuteUpdateSimpleFloats() throws Exception { - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { try (Statement stmt = conn.createStatement()) { assertEquals(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + getDatabase() + ".simpleFloats (num Float32) ENGINE = MergeTree ORDER BY ()"), 0); assertEquals(stmt.executeUpdate("INSERT INTO " + getDatabase() + ".simpleFloats VALUES (1.1), (2.2), (3.3)"), 3); @@ -187,7 +188,7 @@ public void testExecuteUpdateSimpleFloats() throws Exception { @Test(groups = {"integration"}) public void testExecuteUpdateBooleans() throws Exception { - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { try (Statement stmt = conn.createStatement()) { assertEquals(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + getDatabase() + ".booleans (id UInt8, flag Boolean) ENGINE = MergeTree ORDER BY ()"), 0); assertEquals(stmt.executeUpdate("INSERT INTO " + getDatabase() + ".booleans VALUES (0, true), (1, false), (2, true)"), 3); @@ -206,7 +207,7 @@ public void testExecuteUpdateBooleans() throws Exception { @Test(groups = {"integration"}) public void testExecuteUpdateStrings() throws Exception { - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { try (Statement stmt = conn.createStatement()) { assertEquals(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + getDatabase() + ".strings (id UInt8, words String) ENGINE = MergeTree ORDER BY ()"), 0); assertEquals(stmt.executeUpdate("INSERT INTO " + getDatabase() + ".strings VALUES (0, 'Hello'), (1, 'World'), (2, 'ClickHouse')"), 3); @@ -225,7 +226,7 @@ public void testExecuteUpdateStrings() throws Exception { @Test(groups = {"integration"}) public void testExecuteUpdateNulls() throws Exception { - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { try (Statement stmt = conn.createStatement()) { assertEquals(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + getDatabase() + ".nulls (id UInt8, nothing Nullable(String)) ENGINE = MergeTree ORDER BY ()"), 0); assertEquals(stmt.executeUpdate("INSERT INTO " + getDatabase() + ".nulls VALUES (0, 'Hello'), (1, NULL), (2, 'ClickHouse')"), 3); @@ -244,7 +245,7 @@ public void testExecuteUpdateNulls() throws Exception { @Test(groups = {"integration"}) public void testExecuteUpdateDates() throws Exception { - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { try (Statement stmt = conn.createStatement()) { assertEquals(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + getDatabase() + ".dates (id UInt8, date Nullable(Date), datetime Nullable(DateTime)) ENGINE = MergeTree ORDER BY ()"), 0); assertEquals(stmt.executeUpdate("INSERT INTO " + getDatabase() + ".dates VALUES (0, '2020-01-01', '2020-01-01 10:11:12'), (1, NULL, '2020-01-01 12:10:07'), (2, '2020-01-01', NULL)"), 3); @@ -264,10 +265,81 @@ public void testExecuteUpdateDates() throws Exception { } } + private static final int ASYNC_INSERT_SETTINGS_DP_ROWS = 100_000; + + @DataProvider(name = "asyncInsertSettingsDP") + public static Object[][] asyncInsertSettingsDP() { + return new Object[][]{ + // asyncInsert, waitAsyncInsert, expectedUpdateCount, expectedSelectCount, should fail + {ServerSettings.OFF, ServerSettings.OFF, ASYNC_INSERT_SETTINGS_DP_ROWS, ASYNC_INSERT_SETTINGS_DP_ROWS, true}, + {ServerSettings.OFF, ServerSettings.ON, ASYNC_INSERT_SETTINGS_DP_ROWS, ASYNC_INSERT_SETTINGS_DP_ROWS, true}, + {ServerSettings.ON, ServerSettings.OFF, 0, -1, false}, // return immediately + {ServerSettings.ON, ServerSettings.ON, ASYNC_INSERT_SETTINGS_DP_ROWS, ASYNC_INSERT_SETTINGS_DP_ROWS, true} + }; + } + + @Test(groups = {"integration"}, dataProvider = "asyncInsertSettingsDP") + public void testInsertWithAsyncInsert(String asyncInsert, String waitAsyncInsert, int expectedUpdateCount, int expectedSelectCount, boolean fails) throws Exception { + String tableName = "test_async_insert_param_" + asyncInsert + "_" + waitAsyncInsert + "_" + UUID.randomUUID().toString().replace("-", "_"); + + Properties props = new Properties(); + props.setProperty(ClientConfigProperties.serverSetting(ServerSettings.ASYNC_INSERT), asyncInsert); + props.setProperty(ClientConfigProperties.serverSetting(ServerSettings.WAIT_ASYNC_INSERT), waitAsyncInsert); + // Wait end of query off for isolation of this logic + props.setProperty(ClientConfigProperties.serverSetting(ServerSettings.WAIT_END_OF_QUERY), ServerSettings.OFF); + + if (waitAsyncInsert.equals(ServerSettings.ON)) { + // make it flush to disk to check that we get result. If buffer is bigger server may wait flushing to disk. + props.setProperty(ClientConfigProperties.serverSetting("async_insert_max_data_size"), "3488890"); + } + + StringBuilder sb = new StringBuilder("INSERT INTO " + getDatabase() + "." + tableName + " FORMAT TSV\n"); + for (int i = 0; i < ASYNC_INSERT_SETTINGS_DP_ROWS; i++) { + sb.append(i).append("\t") + .append("name_").append(i).append("\t") + .append(i * 1.1).append("\t") + .append(i % 2).append("\t") + .append("2023-01-01 10:11:12").append("\n"); + } + final String insertStatement = sb.toString(); + + try (Connection conn = getJdbcConnection(props)) { + try (Statement stmt = conn.createStatement()) { + stmt.execute("CREATE TABLE IF NOT EXISTS " + getDatabase() + "." + tableName + " (id UInt32, name String, value Float64, status Int8, timestamp DateTime) ENGINE = MergeTree ORDER BY id"); + stmt.execute("TRUNCATE TABLE " + getDatabase() + "." + tableName); + + int updateCount = stmt.executeUpdate(insertStatement); + + assertEquals(updateCount, expectedUpdateCount); + + try (ResultSet rs = stmt.executeQuery("SELECT count() FROM " + getDatabase() + "." + tableName)) { + assertTrue(rs.next()); + int count = rs.getInt(1); + if (expectedSelectCount == -1) { + assertTrue(count < ASYNC_INSERT_SETTINGS_DP_ROWS, "Expected count to be < " + ASYNC_INSERT_SETTINGS_DP_ROWS + ", but was: " + count); + } else { + assertEquals(count, expectedSelectCount); + } + } + + // verify error scenario + final String failingInsertStatement = insertStatement + + "some\t1invalid\trow\t10\n"; + + try { + stmt.executeUpdate(failingInsertStatement); + assertFalse(fails, "should fail"); + } catch (Exception e) { + assertTrue(fails, "should not fail"); + } + } + } + } + @Test(groups = {"integration"}) public void testExecuteUpdateBatch() throws Exception { - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { try (Statement stmt = conn.createStatement()) { assertEquals(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + getDatabase() + ".batch (id UInt8, num UInt8) ENGINE = MergeTree ORDER BY ()"), 0); stmt.addBatch("INSERT INTO " + getDatabase() + ".batch VALUES (0, 1)"); @@ -296,7 +368,7 @@ public void testExecuteUpdateBatch() throws Exception { @Test(groups = {"integration"}) public void testExecuteUpdateBatchReuse() throws Exception { String tableClause = getDatabase() + ".batch_reuse"; - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { try (Statement stmt = conn.createStatement()) { assertEquals(stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableClause + " (id UInt8, num UInt8) ENGINE = MergeTree ORDER BY ()"), 0); // add and execute first invalid batch @@ -664,7 +736,7 @@ public void testSwitchDatabase() throws Exception { @Test(groups = {"integration"}) public void testNewLineSQLParsing() throws Exception { - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { String sqlCreate = "CREATE TABLE balance ( `id` UUID, `currency` String, `amount` Decimal(64, 18), `create_time` DateTime64(6), `_version` UInt64, `_sign` UInt8 ) ENGINE = ReplacingMergeTree PRIMARY KEY id ORDER BY id;"; try (Statement stmt = conn.createStatement()) { int r = stmt.executeUpdate(sqlCreate); @@ -729,7 +801,7 @@ public void testNewLineSQLParsing() throws Exception { @Test(groups = {"integration"}) public void testNullableFixedStringType() throws Exception { - try (Connection conn = getJdbcConnection()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF))) { String sqlCreate = "CREATE TABLE `data_types` (`f1` FixedString(4),`f2` LowCardinality(FixedString(4)), `f3` Nullable(FixedString(4)), `f4` LowCardinality(Nullable(FixedString(4))) ) ENGINE Memory;"; try (Statement stmt = conn.createStatement()) { int r = stmt.executeUpdate(sqlCreate); @@ -1133,7 +1205,7 @@ public void testExecute() throws Exception { Assert.assertNull(stmt.getResultSet()); } - try (Connection conn = getJdbcConnection(); Statement stmt = conn.createStatement()) { + try (Connection conn = getJdbcConnection(Map.of(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF)); Statement stmt = conn.createStatement()) { // no result set and update count Assert.assertFalse(stmt.execute("CREATE TABLE test_multi_result (id Int32) Engine MergeTree ORDER BY ()")); Assert.assertNull(stmt.getResultSet()); @@ -1326,6 +1398,7 @@ public void testDescribeStatement() throws Exception { public void testUnknownStatement(String parserName) throws Exception { Properties properties = new Properties(); properties.setProperty(DriverProperties.SQL_PARSER.getKey(), parserName); + properties.setProperty(ASYNC_INSERT_SETTING_KEY, ServerSettings.OFF); try (Connection conn = getJdbcConnection(properties)) { try (Statement stmt = conn.createStatement()) {