diff --git a/dataset/pom.xml b/dataset/pom.xml index 7a0210ce95..6824cb2f61 100644 --- a/dataset/pom.xml +++ b/dataset/pom.xml @@ -111,6 +111,67 @@ under the License. + + org.apache.hadoop + hadoop-hdfs + ${dep.hadoop.version} + test + + + commons-logging + commons-logging + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.hadoop + hadoop-hdfs + ${dep.hadoop.version} + test-jar + test + + + commons-logging + commons-logging + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.hadoop + hadoop-minicluster + ${dep.hadoop.version} + test + + + commons-logging + commons-logging + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + com.google.guava guava diff --git a/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java index fcf124a61f..5c0c5f28d0 100644 --- a/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java +++ b/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java @@ -16,7 +16,11 @@ */ package org.apache.arrow.dataset.file; +import java.lang.reflect.Method; +import java.net.URI; +import java.util.LinkedHashSet; import java.util.Optional; +import java.util.Set; import org.apache.arrow.dataset.jni.NativeDatasetFactory; import org.apache.arrow.dataset.jni.NativeMemoryPool; import org.apache.arrow.dataset.scanner.FragmentScanOptions; @@ -25,9 +29,12 @@ /** Java binding of the C++ FileSystemDatasetFactory. */ public class FileSystemDatasetFactory extends NativeDatasetFactory { + private final String[] uris; + public FileSystemDatasetFactory( BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String uri) { super(allocator, memoryPool, createNative(format, uri, Optional.empty())); + this.uris = uri == null ? new String[0] : new String[] {uri}; } public FileSystemDatasetFactory( @@ -37,11 +44,13 @@ public FileSystemDatasetFactory( String uri, Optional fragmentScanOptions) { super(allocator, memoryPool, createNative(format, uri, fragmentScanOptions)); + this.uris = uri == null ? new String[0] : new String[] {uri}; } public FileSystemDatasetFactory( BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String[] uris) { super(allocator, memoryPool, createNative(format, uris, Optional.empty())); + this.uris = uris == null ? new String[0] : uris.clone(); } public FileSystemDatasetFactory( @@ -51,6 +60,73 @@ public FileSystemDatasetFactory( String[] uris, Optional fragmentScanOptions) { super(allocator, memoryPool, createNative(format, uris, fragmentScanOptions)); + this.uris = uris == null ? new String[0] : uris.clone(); + } + + /** + * Close this factory and release the native instance. For HDFS URIs, also closes the cached + * Hadoop FileSystem to release non-daemon threads that would otherwise prevent JVM exit. See #1067. + */ + @Override + public synchronized void close() { + try { + super.close(); + } finally { + closeHadoopFileSystemsIfHdfs(uris); + } + } + + /** + * For each {@code hdfs://} URI, close the cached Hadoop FileSystem. + * When Arrow C++ accesses HDFS via libhdfs, the Hadoop Java client creates cached FileSystem + * instances with non-daemon threads (IPC connections, lease renewers) that prevent JVM exit. + * Closing the FileSystem terminates these connections. Uses reflection to avoid a compile-time + * dependency on hadoop-common. + */ + static void closeHadoopFileSystemsIfHdfs(String... uris) { + if (uris == null || uris.length == 0) { + return; + } + Set hdfsFileSystems = new LinkedHashSet<>(); + for (String uri : uris) { + URI hdfsUri = toHdfsFileSystemUri(uri); + if (hdfsUri != null) { + hdfsFileSystems.add(hdfsUri); + } + } + for (URI hdfsUri : hdfsFileSystems) { + closeHadoopFileSystem(hdfsUri); + } + } + + private static URI toHdfsFileSystemUri(String uri) { + if (uri == null) { + return null; + } + try { + URI parsedUri = new URI(uri); + if (!"hdfs".equalsIgnoreCase(parsedUri.getScheme())) { + return null; + } + return new URI(parsedUri.getScheme(), parsedUri.getAuthority(), null, null, null); + } catch (Exception e) { + return null; + } + } + + private static void closeHadoopFileSystem(URI hdfsUri) { + try { + Class confClass = Class.forName("org.apache.hadoop.conf.Configuration"); + Object conf = confClass.getDeclaredConstructor().newInstance(); + Class fsClass = Class.forName("org.apache.hadoop.fs.FileSystem"); + Method getMethod = fsClass.getMethod("get", URI.class, confClass); + Object fs = getMethod.invoke(null, hdfsUri, conf); + Method closeMethod = fsClass.getMethod("close"); + closeMethod.invoke(fs); + } catch (Exception e) { + // Best-effort cleanup; Hadoop may not be on classpath or FileSystem already closed + } } private static long createNative( diff --git a/dataset/src/test/java/org/apache/arrow/dataset/file/TestHdfsFileSystemCleanup.java b/dataset/src/test/java/org/apache/arrow/dataset/file/TestHdfsFileSystemCleanup.java new file mode 100644 index 0000000000..a4d47f5b2a --- /dev/null +++ b/dataset/src/test/java/org/apache/arrow/dataset/file/TestHdfsFileSystemCleanup.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.dataset.file; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Validates the fix for #1067: JVM + * hangs after reading HDFS files via Arrow Dataset API due to non-daemon native threads. + * + *

When Arrow C++ accesses HDFS via libhdfs, the C library attaches native threads to the JVM via + * {@code AttachCurrentThread}. These threads are non-daemon by default. They block on HDFS IPC + * connections managed by cached {@code FileSystem} instances, preventing the JVM from exiting after + * {@code main()} returns. + * + *

These tests fork a child JVM that simulates this behavior: a non-daemon thread holds an HDFS + * connection. Without cleanup, the JVM hangs. With {@code FileSystem.close()} on the cached + * instance (the same mechanism used by {@link FileSystemDatasetFactory#close()}), the connection is + * closed and the thread exits, allowing the JVM to terminate. + */ +public class TestHdfsFileSystemCleanup { + + private static final int CHILD_TIMEOUT_SECONDS = 10; + + private static MiniDFSCluster cluster; + private static Configuration conf; + + @TempDir static File clusterDir; + + @BeforeAll + static void startCluster() throws IOException { + conf = new Configuration(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterDir.getAbsolutePath()); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + } + + @AfterAll + static void stopCluster() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Without cleanup: a child JVM with a non-daemon thread holding an HDFS connection hangs after + * main() returns because the non-daemon thread keeps the JVM alive. + */ + @Test + void testJvmHangsWithoutCleanup() throws Exception { + Process child = forkChildProcess(false); + boolean exited = child.waitFor(CHILD_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!exited) { + child.destroyForcibly(); + } + assertFalse(exited, "JVM should hang when non-daemon HDFS thread is not cleaned up"); + } + + /** + * With cleanup: closing the specific cached Hadoop FileSystem (the same mechanism used by {@link + * FileSystemDatasetFactory#close()}) closes the HDFS connections, causing the non-daemon thread + * to exit and allowing the JVM to terminate normally. + */ + @Test + void testJvmExitsWithCleanup() throws Exception { + Process child = forkChildProcess(true); + boolean exited = child.waitFor(CHILD_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!exited) { + child.destroyForcibly(); + } + assertTrue( + exited, "JVM should exit when FileSystemDatasetFactory cleanup runs before return"); + assertEquals(0, child.exitValue(), "Child process should exit cleanly (exit code 0)"); + } + + private Process forkChildProcess(boolean withCleanup) throws IOException { + String classpath = System.getProperty("java.class.path"); + int port = cluster.getNameNodePort(); + ProcessBuilder pb = + new ProcessBuilder( + ProcessHandle.current().info().command().orElse("java"), + "-cp", + classpath, + HdfsClientSimulator.class.getName(), + String.valueOf(port), + String.valueOf(withCleanup)); + pb.redirectError(ProcessBuilder.Redirect.DISCARD); + pb.redirectOutput(ProcessBuilder.Redirect.DISCARD); + return pb.start(); + } + + /** + * Simulates the libhdfs behavior in a standalone JVM. + * + *

When Arrow C++ uses libhdfs to access HDFS, native threads are attached to the JVM via + * {@code AttachCurrentThread}. These threads are non-daemon by default and block on HDFS IPC + * connections. This class simulates that by creating a non-daemon thread that periodically polls + * the HDFS filesystem. Without closing the cached FileSystem, the thread keeps the JVM alive + * indefinitely after {@code main()} returns. + */ + public static class HdfsClientSimulator { + public static void main(String[] args) throws Exception { + int port = Integer.parseInt(args[0]); + boolean withCleanup = Boolean.parseBoolean(args[1]); + + Configuration conf = new Configuration(); + String hdfsUri = "hdfs://localhost:" + port; + conf.set("fs.defaultFS", hdfsUri); + + // Get a cached FileSystem (same as libhdfs does internally) + FileSystem fs = FileSystem.get(conf); + fs.exists(new Path("/")); + + // Simulate a non-daemon thread attached via AttachCurrentThread that + // blocks on the HDFS IPC connection. In the real libhdfs scenario, + // these are native threads that process RPC responses. + Thread connectionThread = + new Thread( + () -> { + while (true) { + try { + fs.getFileStatus(new Path("/")); + Thread.sleep(500); + } catch (Exception e) { + // FileSystem was closed or thread interrupted — exit + break; + } + } + }); + connectionThread.setDaemon(false); + connectionThread.setName("simulated-libhdfs-ipc-thread"); + connectionThread.start(); + + if (withCleanup) { + // Use the same helper invoked by FileSystemDatasetFactory.close(). + // Closing the cached FileSystem terminates IPC connections, causing + // the non-daemon thread to get an IOException and exit. + FileSystemDatasetFactory.closeHadoopFileSystemsIfHdfs(hdfsUri); + connectionThread.join(5000); + } + + // main() returns. Without cleanup, the non-daemon thread keeps the JVM alive. + } + } +}