diff --git a/ddprof-lib/src/main/cpp/arguments.cpp b/ddprof-lib/src/main/cpp/arguments.cpp index 72b8aec22..7767e64cd 100644 --- a/ddprof-lib/src/main/cpp/arguments.cpp +++ b/ddprof-lib/src/main/cpp/arguments.cpp @@ -374,6 +374,21 @@ Error Arguments::parse(const char *args) { } } + CASE("nativemem") + _nativemem = value == NULL ? 0 : parseUnits(value, BYTES); + if (_nativemem < 0) { + msg = "nativemem must be >= 0"; + } + + CASE("natsock") + _nativesocket = true; + if (value != NULL) { + _nativesocket_interval = parseUnits(value, NANOS); + if (_nativesocket_interval < 0) { + msg = "natsock interval must be >= 0"; + } + } + DEFAULT() if (_unknown_arg == NULL) _unknown_arg = arg; @@ -385,7 +400,7 @@ Error Arguments::parse(const char *args) { return Error(msg); } - if (_event == NULL && _cpu < 0 && _wall < 0 && _memory < 0) { + if (_event == NULL && _cpu < 0 && _wall < 0 && _memory < 0 && _nativemem < 0) { _event = EVENT_CPU; } diff --git a/ddprof-lib/src/main/cpp/arguments.h b/ddprof-lib/src/main/cpp/arguments.h index 462be5b53..1899e25d1 100644 --- a/ddprof-lib/src/main/cpp/arguments.h +++ b/ddprof-lib/src/main/cpp/arguments.h @@ -1,5 +1,6 @@ /* * Copyright 2017 Andrei Pangin + * Copyright 2026, Datadog, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -99,9 +100,10 @@ enum EventMask { EM_LOCK = 4, EM_WALL = 8, EM_NATIVEMEM = 16, - EM_METHOD_TRACE = 32 + EM_METHOD_TRACE = 32, + EM_NATIVESOCKET = 64 }; -constexpr int EVENT_MASK_SIZE = 6; +constexpr int EVENT_MASK_SIZE = 7; struct StackWalkFeatures { // Deprecated stack recovery techniques used to workaround AsyncGetCallTrace flaws @@ -174,6 +176,7 @@ class Arguments { double _live_samples_ratio; bool _record_heap_usage; bool _gc_generations; + long _nativemem; int _jstackdepth; int _safe_mode; StackWalkFeatures _features; @@ -189,6 +192,8 @@ class Arguments { bool _lightweight; bool _enable_method_cleanup; bool _remote_symbolication; // Enable remote symbolication for native frames + bool _nativesocket; + long _nativesocket_interval; // initial sampling period in nanoseconds; 0 = engine default Arguments(bool persistent = false) : _buf(NULL), @@ -209,6 +214,7 @@ class Arguments { _live_samples_ratio(0.1), // default to liveness-tracking 10% of the allocation samples _record_heap_usage(false), _gc_generations(false), + _nativemem(-1), _jstackdepth(DEFAULT_JSTACKDEPTH), _safe_mode(0), _features{1, 1, 1, 1, 1, 1}, @@ -223,7 +229,9 @@ class Arguments { _context_attributes({}), _lightweight(false), _enable_method_cleanup(true), - _remote_symbolication(false) {} + _remote_symbolication(false), + _nativesocket(false), + _nativesocket_interval(0) {} ~Arguments(); diff --git a/ddprof-lib/src/main/cpp/codeCache.cpp b/ddprof-lib/src/main/cpp/codeCache.cpp index 9ce154687..5203d5c5d 100644 --- a/ddprof-lib/src/main/cpp/codeCache.cpp +++ b/ddprof-lib/src/main/cpp/codeCache.cpp @@ -1,5 +1,6 @@ /* * Copyright The async-profiler authors + * Copyright 2026, Datadog, Inc. * SPDX-License-Identifier: Apache-2.0 */ @@ -308,6 +309,11 @@ void CodeCache::saveImport(ImportId id, void** entry) { void CodeCache::addImport(void **entry, const char *name) { switch (name[0]) { + case 'a': + if (strcmp(name, "aligned_alloc") == 0) { + saveImport(im_aligned_alloc, entry); + } + break; case 'c': if (strcmp(name, "calloc") == 0) { saveImport(im_calloc, entry); @@ -337,18 +343,31 @@ void CodeCache::addImport(void **entry, const char *name) { saveImport(im_pthread_setspecific, entry); } else if (strcmp(name, "poll") == 0) { saveImport(im_poll, entry); + } else if (strcmp(name, "posix_memalign") == 0) { + saveImport(im_posix_memalign, entry); } break; case 'r': if (strcmp(name, "realloc") == 0) { saveImport(im_realloc, entry); + } else if (strcmp(name, "recv") == 0) { + saveImport(im_recv, entry); + } else if (strcmp(name, "read") == 0) { + saveImport(im_read, entry); } break; case 's': - if (strcmp(name, "sigaction") == 0) { + if (strcmp(name, "send") == 0) { + saveImport(im_send, entry); + } else if (strcmp(name, "sigaction") == 0) { saveImport(im_sigaction, entry); } break; + case 'w': + if (strcmp(name, "write") == 0) { + saveImport(im_write, entry); + } + break; } } diff --git a/ddprof-lib/src/main/cpp/codeCache.h b/ddprof-lib/src/main/cpp/codeCache.h index 170ac22a3..92b45bf47 100644 --- a/ddprof-lib/src/main/cpp/codeCache.h +++ b/ddprof-lib/src/main/cpp/codeCache.h @@ -1,5 +1,6 @@ /* * Copyright The async-profiler authors + * Copyright 2026, Datadog, Inc. * SPDX-License-Identifier: Apache-2.0 */ @@ -34,7 +35,13 @@ enum ImportId { im_calloc, im_realloc, im_free, + im_posix_memalign, + im_aligned_alloc, im_sigaction, + im_send, + im_recv, + im_write, + im_read, NUM_IMPORTS }; diff --git a/ddprof-lib/src/main/cpp/event.h b/ddprof-lib/src/main/cpp/event.h index e9363165f..0e334478d 100644 --- a/ddprof-lib/src/main/cpp/event.h +++ b/ddprof-lib/src/main/cpp/event.h @@ -1,5 +1,6 @@ /* * Copyright 2020 Andrei Pangin + * Copyright 2026, Datadog, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,6 +89,29 @@ class ObjectLivenessEvent : public Event { Context _ctx; }; +class MallocEvent : public Event { +public: + u64 _start_time; + uintptr_t _address; + u64 _size; // 0 for free events + float _weight; + + MallocEvent() : Event(), _start_time(0), _address(0), _size(0), _weight(1.0f) {} +}; + +class NativeSocketEvent : public Event { +public: + u64 _start_time; // TSC ticks at call entry + u64 _end_time; // TSC ticks at call return + u8 _operation; // 0 = SEND, 1 = RECV + char _remote_addr[64]; // "ip:port" null-terminated string + u64 _bytes; // bytes transferred (return value of send/recv/write/read) + float _weight; // inverse-transform sample weight + + NativeSocketEvent() : Event(), _start_time(0), _end_time(0), _operation(0), + _bytes(0), _weight(1.0f) { _remote_addr[0] = '\0'; } +}; + class WallClockEpochEvent { public: bool _dirty; diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index eeaa3f32d..5b537f4a8 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -906,6 +906,10 @@ void Recording::writeSettings(Buffer *buf, Arguments &args) { writeBoolSetting(buf, T_ALLOC, "enabled", args._record_allocations); writeBoolSetting(buf, T_HEAP_LIVE_OBJECT, "enabled", args._record_liveness); + writeBoolSetting(buf, T_MALLOC, "enabled", args._nativemem >= 0); + if (args._nativemem >= 0) { + writeIntSetting(buf, T_MALLOC, "nativemem", args._nativemem); + } writeBoolSetting(buf, T_ACTIVE_RECORDING, "debugSymbols", VMStructs::libjvm()->hasDebugSymbols()); @@ -1575,6 +1579,38 @@ void Recording::recordAllocation(RecordingBuffer *buf, int tid, flushIfNeeded(buf); } +void Recording::recordMallocSample(Buffer *buf, int tid, u64 call_trace_id, + MallocEvent *event) { + int start = buf->skip(1); + buf->putVar64(T_MALLOC); + buf->putVar64(event->_start_time); + buf->putVar32(tid); + buf->putVar64(call_trace_id); + buf->putVar64(event->_address); + buf->putVar64(event->_size); + buf->putFloat(event->_weight); + writeCurrentContext(buf); + writeEventSizePrefix(buf, start); + flushIfNeeded(buf); +} + +void Recording::recordNativeSocketSample(Buffer *buf, int tid, u64 call_trace_id, + NativeSocketEvent *event) { + int start = buf->skip(1); + buf->putVar64(T_NATIVE_SOCKET); + buf->putVar64(event->_start_time); + buf->putVar32(tid); + buf->putVar64(call_trace_id); + buf->putVar64(event->_end_time - event->_start_time); + buf->putUtf8(event->_operation == 0 ? "SEND" : "RECV"); + buf->putUtf8(event->_remote_addr); + buf->putVar64(event->_bytes); + buf->putFloat(event->_weight); + writeCurrentContext(buf); + writeEventSizePrefix(buf, start); + flushIfNeeded(buf); +} + void Recording::recordHeapLiveObject(Buffer *buf, int tid, u64 call_trace_id, ObjectLivenessEvent *event) { int start = buf->skip(1); @@ -1817,6 +1853,12 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u64 call_trace_id, case BCI_PARK: rec->recordThreadPark(buf, tid, call_trace_id, (LockEvent *)event); break; + case BCI_NATIVE_MALLOC: + rec->recordMallocSample(buf, tid, call_trace_id, (MallocEvent *)event); + break; + case BCI_NATIVE_SOCKET: + rec->recordNativeSocketSample(buf, tid, call_trace_id, (NativeSocketEvent *)event); + break; } rec->flushIfNeeded(buf); rec->addThread(lock_index, tid); diff --git a/ddprof-lib/src/main/cpp/flightRecorder.h b/ddprof-lib/src/main/cpp/flightRecorder.h index 02efdebc0..5ea2b0dc1 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.h +++ b/ddprof-lib/src/main/cpp/flightRecorder.h @@ -281,6 +281,10 @@ class Recording { void recordQueueTime(Buffer *buf, int tid, QueueTimeEvent *event); void recordAllocation(RecordingBuffer *buf, int tid, u64 call_trace_id, AllocEvent *event); + void recordMallocSample(Buffer *buf, int tid, u64 call_trace_id, + MallocEvent *event); + void recordNativeSocketSample(Buffer *buf, int tid, u64 call_trace_id, + NativeSocketEvent *event); void recordHeapLiveObject(Buffer *buf, int tid, u64 call_trace_id, ObjectLivenessEvent *event); void recordMonitorBlocked(Buffer *buf, int tid, u64 call_trace_id, diff --git a/ddprof-lib/src/main/cpp/hotspot/hotspotSupport.cpp b/ddprof-lib/src/main/cpp/hotspot/hotspotSupport.cpp index f1aa809f3..1fb6e9dd3 100644 --- a/ddprof-lib/src/main/cpp/hotspot/hotspotSupport.cpp +++ b/ddprof-lib/src/main/cpp/hotspot/hotspotSupport.cpp @@ -66,6 +66,8 @@ inline EventType eventTypeFromBCI(jint bci_type) { return LOCK_SAMPLE; case BCI_PARK: return PARK_SAMPLE; + case BCI_NATIVE_MALLOC: + return MALLOC_SAMPLE; default: // For unknown or invalid BCI types, default to EXECUTION_SAMPLE // This maintains backward compatibility and prevents undefined behavior @@ -955,6 +957,10 @@ int HotspotSupport::walkJavaStack(StackWalkRequest& request) { int java_frames = 0; if (features.mixed) { java_frames = walkVM(ucontext, frames, max_depth, features, eventTypeFromBCI(request.event_type), lock_index, truncated); + } else if (request.event_type == BCI_NATIVE_MALLOC || request.event_type == BCI_NATIVE_SOCKET) { + if (cstack >= CSTACK_VM) { + java_frames = walkVM(ucontext, frames, max_depth, features, eventTypeFromBCI(request.event_type), lock_index, truncated); + } } else if (request.event_type == BCI_CPU || request.event_type == BCI_WALL) { if (cstack >= CSTACK_VM) { java_frames = walkVM(ucontext, frames, max_depth, features, eventTypeFromBCI(request.event_type), lock_index, truncated); diff --git a/ddprof-lib/src/main/cpp/jfrMetadata.cpp b/ddprof-lib/src/main/cpp/jfrMetadata.cpp index 127f99c87..2830396d7 100644 --- a/ddprof-lib/src/main/cpp/jfrMetadata.cpp +++ b/ddprof-lib/src/main/cpp/jfrMetadata.cpp @@ -1,5 +1,6 @@ /* * Copyright 2020 Andrei Pangin + * Copyright 2026, Datadog, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -298,6 +299,32 @@ void JfrMetadata::initialize( << field("name", T_STRING, "Name") << field("count", T_LONG, "Count")) + << (type("profiler.Malloc", T_MALLOC, "malloc") + << category("Java Virtual Machine", "Native Memory") + << field("startTime", T_LONG, "Start Time", F_TIME_TICKS) + << field("eventThread", T_THREAD, "Event Thread", F_CPOOL) + << field("stackTrace", T_STACK_TRACE, "Stack Trace", F_CPOOL) + << field("address", T_LONG, "Address", F_ADDRESS) + << field("size", T_LONG, "Size", F_BYTES) + << field("weight", T_FLOAT, "Sample weight") + << field("spanId", T_LONG, "Span ID") + << field("localRootSpanId", T_LONG, "Local Root Span ID") || + contextAttributes) + + << (type("datadog.NativeSocketEvent", T_NATIVE_SOCKET, "Native Socket I/O") + << category("Datadog", "Profiling") + << field("startTime", T_LONG, "Start Time", F_TIME_TICKS) + << field("eventThread", T_THREAD, "Event Thread", F_CPOOL) + << field("stackTrace", T_STACK_TRACE, "Stack Trace", F_CPOOL) + << field("duration", T_LONG, "Duration", F_DURATION_TICKS) + << field("operation", T_STRING, "Operation") + << field("remoteAddress", T_STRING, "Remote Address") + << field("bytesTransferred", T_LONG, "Bytes Transferred", F_BYTES) + << field("weight", T_FLOAT, "Sample weight") + << field("spanId", T_LONG, "Span ID") + << field("localRootSpanId", T_LONG, "Local Root Span ID") || + contextAttributes) + << (type("jdk.OSInformation", T_OS_INFORMATION, "OS Information") << category("Operating System") << field("startTime", T_LONG, "Start Time", F_TIME_TICKS) diff --git a/ddprof-lib/src/main/cpp/jfrMetadata.h b/ddprof-lib/src/main/cpp/jfrMetadata.h index b6cfc054a..ac241a7a8 100644 --- a/ddprof-lib/src/main/cpp/jfrMetadata.h +++ b/ddprof-lib/src/main/cpp/jfrMetadata.h @@ -1,5 +1,6 @@ /* * Copyright 2020 Andrei Pangin + * Copyright 2026, Datadog, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,6 +79,8 @@ enum JfrType { T_DATADOG_CLASSREF_CACHE = 124, T_DATADOG_COUNTER = 125, T_UNWIND_FAILURE = 126, + T_MALLOC = 127, + T_NATIVE_SOCKET = 128, T_ANNOTATION = 200, T_LABEL = 201, T_CATEGORY = 202, diff --git a/ddprof-lib/src/main/cpp/jvmSupport.cpp b/ddprof-lib/src/main/cpp/jvmSupport.cpp index 661eca786..93bbe00a0 100644 --- a/ddprof-lib/src/main/cpp/jvmSupport.cpp +++ b/ddprof-lib/src/main/cpp/jvmSupport.cpp @@ -20,7 +20,10 @@ int JVMSupport::walkJavaStack(StackWalkRequest& request) { if (VM::isHotspot()) { return HotspotSupport::walkJavaStack(request); } else if (VM::isOpenJ9() || VM::isZing()) { - assert(request.event_type == BCI_CPU || request.event_type == BCI_WALL); + assert(request.event_type == BCI_CPU || + request.event_type == BCI_WALL || + request.event_type == BCI_NATIVE_MALLOC || + request.event_type == BCI_NATIVE_SOCKET); return asyncGetCallTrace(request.frames, request.max_depth, request.ucontext); } assert(false && "Unsupported JVM"); diff --git a/ddprof-lib/src/main/cpp/libraryPatcher.h b/ddprof-lib/src/main/cpp/libraryPatcher.h index 39cf6822c..fe7d4ec02 100644 --- a/ddprof-lib/src/main/cpp/libraryPatcher.h +++ b/ddprof-lib/src/main/cpp/libraryPatcher.h @@ -28,15 +28,31 @@ class LibraryPatcher { static PatchEntry _sigaction_entries[MAX_NATIVE_LIBS]; static int _sigaction_size; + // Separate tracking for socket (send/recv/write/read) patches. + static PatchEntry _socket_entries[MAX_NATIVE_LIBS]; + static int _socket_size; + static void patch_library_unlocked(CodeCache* lib); static void patch_pthread_create(); static void patch_pthread_setspecific(); static void patch_sigaction_in_library(CodeCache* lib); public: + // True while socket hooks are installed; read by Profiler::dlopen_hook + // to decide whether to re-patch after a new library is loaded. + static bool _socket_active; static void initialize(); static void patch_libraries(); static void unpatch_libraries(); static void patch_sigaction(); + static void patch_socket_functions(); + static void unpatch_socket_functions(); + // Called from Profiler::dlopen_hook after a new library is loaded. + // No-op when socket hooks are not active. + static inline void install_socket_hooks() { + if (_socket_active) { + patch_socket_functions(); + } + } }; #else @@ -47,8 +63,11 @@ class LibraryPatcher { static void patch_libraries() { } static void unpatch_libraries() { } static void patch_sigaction() { } + static void patch_socket_functions() { } + static void unpatch_socket_functions() { } + static void install_socket_hooks() { } }; #endif -#endif // _LIBRARYPATCHER_H \ No newline at end of file +#endif // _LIBRARYPATCHER_H diff --git a/ddprof-lib/src/main/cpp/libraryPatcher_linux.cpp b/ddprof-lib/src/main/cpp/libraryPatcher_linux.cpp index d1bc8c23d..afd8e2b61 100644 --- a/ddprof-lib/src/main/cpp/libraryPatcher_linux.cpp +++ b/ddprof-lib/src/main/cpp/libraryPatcher_linux.cpp @@ -7,8 +7,9 @@ #ifdef __linux__ #include "counters.h" -#include "profiler.h" #include "guards.h" +#include "nativeSocketSampler.h" +#include "profiler.h" #include #include @@ -24,6 +25,9 @@ PatchEntry LibraryPatcher::_patched_entries[MAX_NATIVE_LIBS]; int LibraryPatcher::_size = 0; PatchEntry LibraryPatcher::_sigaction_entries[MAX_NATIVE_LIBS]; int LibraryPatcher::_sigaction_size = 0; +PatchEntry LibraryPatcher::_socket_entries[MAX_NATIVE_LIBS]; +int LibraryPatcher::_socket_size = 0; +bool LibraryPatcher::_socket_active = false; void LibraryPatcher::initialize() { if (_profiler_name == nullptr) { @@ -319,4 +323,135 @@ void LibraryPatcher::patch_sigaction() { } } +#if defined(__GLIBC__) +void LibraryPatcher::patch_socket_functions() { + // Pre-resolve the real libc symbols via RTLD_NEXT *before* acquiring _lock. + // RTLD_NEXT finds the first definition after this DSO in load order, + // bypassing unresolved lazy-binding stubs that would otherwise trigger + // _dl_runtime_resolve and silently overwrite the hook in the GOT. + // May resolve to an LD_PRELOAD interposer (e.g. libasan) — intentional. + auto pre_send = (NativeSocketSampler::send_fn) dlsym(RTLD_NEXT, "send"); + auto pre_recv = (NativeSocketSampler::recv_fn) dlsym(RTLD_NEXT, "recv"); + auto pre_write = (NativeSocketSampler::write_fn) dlsym(RTLD_NEXT, "write"); + auto pre_read = (NativeSocketSampler::read_fn) dlsym(RTLD_NEXT, "read"); + TEST_LOG("patch_socket_functions dlsym send=%p recv=%p write=%p read=%p", + (void*)pre_send, (void*)pre_recv, (void*)pre_write, (void*)pre_read); + if (!pre_send || !pre_recv || !pre_write || !pre_read) { + TEST_LOG("patch_socket_functions EARLY RETURN: at least one dlsym returned NULL"); + return; + } + + const CodeCacheArray& native_libs = Libraries::instance()->native_libs(); + int num_of_libs = native_libs.count(); + ExclusiveLockGuard locker(&_lock); + // Assign pre-resolved pointers unconditionally — in-flight hook invocations + // dereference _orig_send/_recv/_write/_read with no guard; a null window + // between reset and re-assignment would crash. + NativeSocketSampler::_orig_send = pre_send; + NativeSocketSampler::_orig_recv = pre_recv; + NativeSocketSampler::_orig_write = pre_write; + NativeSocketSampler::_orig_read = pre_read; + for (int index = 0; index < num_of_libs; index++) { + CodeCache* lib = native_libs.at(index); + if (lib == nullptr) continue; + if (lib->name() == nullptr) continue; + + char path[PATH_MAX]; + char* resolved_path = realpath(lib->name(), path); + if (resolved_path != nullptr && strcmp(resolved_path, _profiler_name) == 0) { + continue; + } + + void** send_location = (void**)lib->findImport(im_send); + void** recv_location = (void**)lib->findImport(im_recv); + void** write_location = (void**)lib->findImport(im_write); + void** read_location = (void**)lib->findImport(im_read); + + if (send_location == nullptr && recv_location == nullptr + && write_location == nullptr && read_location == nullptr) continue; + + bool already_patched = false; + for (int i = 0; i < _socket_size; i++) { + if (_socket_entries[i]._lib == lib) { + already_patched = true; + break; + } + } + if (already_patched) { + TEST_LOG("patch_socket_functions SKIP (already patched): %s", lib->name()); + continue; + } + + // Count how many slots this library needs before committing any patches. + int needed = (send_location != nullptr ? 1 : 0) + + (recv_location != nullptr ? 1 : 0) + + (write_location != nullptr ? 1 : 0) + + (read_location != nullptr ? 1 : 0); + if (_socket_size + needed > MAX_NATIVE_LIBS) { + TEST_LOG("patch_socket_functions SKIP (MAX_NATIVE_LIBS): %s needed=%d _socket_size=%d", + lib->name(), needed, _socket_size); + continue; + } + TEST_LOG("patch_socket_functions PATCH %s send=%p recv=%p write=%p read=%p", + lib->name(), (void*)send_location, (void*)recv_location, + (void*)write_location, (void*)read_location); + + if (send_location != nullptr) { + void* orig = (void*)__atomic_load_n(send_location, __ATOMIC_RELAXED); + _socket_entries[_socket_size]._lib = lib; + _socket_entries[_socket_size]._location = send_location; + _socket_entries[_socket_size]._func = orig; + __atomic_store_n(send_location, (void*)NativeSocketSampler::send_hook, __ATOMIC_RELAXED); + _socket_size++; + } + if (recv_location != nullptr) { + void* orig = (void*)__atomic_load_n(recv_location, __ATOMIC_RELAXED); + _socket_entries[_socket_size]._lib = lib; + _socket_entries[_socket_size]._location = recv_location; + _socket_entries[_socket_size]._func = orig; + __atomic_store_n(recv_location, (void*)NativeSocketSampler::recv_hook, __ATOMIC_RELAXED); + _socket_size++; + } + if (write_location != nullptr) { + void* orig = (void*)__atomic_load_n(write_location, __ATOMIC_RELAXED); + _socket_entries[_socket_size]._lib = lib; + _socket_entries[_socket_size]._location = write_location; + _socket_entries[_socket_size]._func = orig; + __atomic_store_n(write_location, (void*)NativeSocketSampler::write_hook, __ATOMIC_RELAXED); + _socket_size++; + } + if (read_location != nullptr) { + void* orig = (void*)__atomic_load_n(read_location, __ATOMIC_RELAXED); + _socket_entries[_socket_size]._lib = lib; + _socket_entries[_socket_size]._location = read_location; + _socket_entries[_socket_size]._func = orig; + __atomic_store_n(read_location, (void*)NativeSocketSampler::read_hook, __ATOMIC_RELAXED); + _socket_size++; + } + } + + TEST_LOG("patch_socket_functions DONE total_slots=%d num_libs_scanned=%d", + _socket_size, num_of_libs); + _socket_active = true; +} + +void LibraryPatcher::unpatch_socket_functions() { + ExclusiveLockGuard locker(&_lock); + _socket_active = false; + TEST_LOG("unpatch_socket_functions restoring %d slot(s)", _socket_size); + for (int index = 0; index < _socket_size; index++) { + __atomic_store_n(_socket_entries[index]._location, _socket_entries[index]._func, __ATOMIC_RELAXED); + } + _socket_size = 0; + // _orig_send/_orig_recv/_orig_write/_orig_read are intentionally NOT nulled. + // In-flight hook invocations that entered before PLT entries were restored + // above may still be executing and will dereference these pointers. + // They remain valid (pointing to the real libc functions) until the next + // patch_socket_functions() call. +} +#else // !__GLIBC__ +void LibraryPatcher::patch_socket_functions() {} +void LibraryPatcher::unpatch_socket_functions() {} +#endif // __GLIBC__ + #endif // __linux__ diff --git a/ddprof-lib/src/main/cpp/livenessTracker.cpp b/ddprof-lib/src/main/cpp/livenessTracker.cpp index afb2c1f74..cae96fabe 100644 --- a/ddprof-lib/src/main/cpp/livenessTracker.cpp +++ b/ddprof-lib/src/main/cpp/livenessTracker.cpp @@ -211,6 +211,10 @@ void LivenessTracker::stop() { Error LivenessTracker::initialize(Arguments &args) { _enabled = args._gc_generations || args._record_liveness; + // Per-call toggles that must reflect the current start's arguments even when + // the tracker's table is reused from a prior initialize (_initialized=true). + // On musl the test JVM is not forked per test, so the singleton survives. + _record_heap_usage = args._record_heap_usage; if (!_enabled) { return Error::OK; @@ -265,8 +269,6 @@ Error LivenessTracker::initialize(Arguments &args) { // enough for 1G of heap _table = (TrackingEntry *)malloc(sizeof(TrackingEntry) * _table_cap); - _record_heap_usage = args._record_heap_usage; - _gc_epoch = 0; _last_gc_epoch = 0; diff --git a/ddprof-lib/src/main/cpp/mallocTracer.cpp b/ddprof-lib/src/main/cpp/mallocTracer.cpp new file mode 100644 index 000000000..6a6aabe83 --- /dev/null +++ b/ddprof-lib/src/main/cpp/mallocTracer.cpp @@ -0,0 +1,345 @@ +/* + * Copyright The async-profiler authors + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include "codeCache.h" +#include "libraries.h" +#include "mallocTracer.h" +#include "os.h" +#include "pidController.h" +#include "profiler.h" +#include "symbols.h" +#include "tsc.h" +#include "vmEntry.h" + +#ifdef __clang__ +# define NO_OPTIMIZE __attribute__((optnone)) +#else +# define NO_OPTIMIZE __attribute__((optimize("-fno-omit-frame-pointer,-fno-optimize-sibling-calls"))) +#endif + +#define SAVE_IMPORT(FUNC) \ + do { \ + void** _entry = lib->findImport(im_##FUNC); \ + if (_entry != NULL) _orig_##FUNC = (decltype(_orig_##FUNC))*_entry; \ + } while (0) + +static void* (*_orig_malloc)(size_t); +static void (*_orig_free)(void*); +static void* (*_orig_calloc)(size_t, size_t); +static void* (*_orig_realloc)(void*, size_t); +static int (*_orig_posix_memalign)(void**, size_t, size_t); +static void* (*_orig_aligned_alloc)(size_t, size_t); + +extern "C" void* malloc_hook(size_t size) { + void* ret = _orig_malloc(size); + if (MallocTracer::running() && ret && size) { + MallocTracer::recordMalloc(ret, size); + } + return ret; +} + +extern "C" void* calloc_hook(size_t num, size_t size) { + void* ret = _orig_calloc(num, size); + if (MallocTracer::running() && ret && num && size) { + size_t total; + if (__builtin_mul_overflow(num, size, &total)) { + total = SIZE_MAX; + } + MallocTracer::recordMalloc(ret, total); + } + return ret; +} + +// Make sure this is not optimized away (function-scoped -fno-optimize-sibling-calls) +extern "C" NO_OPTIMIZE +void* calloc_hook_dummy(size_t num, size_t size) { + return _orig_calloc(num, size); +} + +extern "C" void* realloc_hook(void* addr, size_t size) { + void* ret = _orig_realloc(addr, size); + if (MallocTracer::running() && ret != NULL && size > 0) { + MallocTracer::recordMalloc(ret, size); + } + return ret; +} + +extern "C" void free_hook(void* addr) { + _orig_free(addr); +} + +extern "C" int posix_memalign_hook(void** memptr, size_t alignment, size_t size) { + int ret = _orig_posix_memalign(memptr, alignment, size); + if (MallocTracer::running() && ret == 0 && memptr && *memptr && size) { + MallocTracer::recordMalloc(*memptr, size); + } + return ret; +} + +// Make sure this is not optimized away (function-scoped -fno-optimize-sibling-calls) +extern "C" NO_OPTIMIZE +int posix_memalign_hook_dummy(void** memptr, size_t alignment, size_t size) { + return _orig_posix_memalign(memptr, alignment, size); +} + +extern "C" void* aligned_alloc_hook(size_t alignment, size_t size) { + void* ret = _orig_aligned_alloc(alignment, size); + if (MallocTracer::running() && ret && size) { + MallocTracer::recordMalloc(ret, size); + } + return ret; +} + +volatile u64 MallocTracer::_interval; +volatile u64 MallocTracer::_bytes_until_sample; +u64 MallocTracer::_configured_interval; +volatile u64 MallocTracer::_sample_count; +volatile u64 MallocTracer::_last_config_update_ts; + +Mutex MallocTracer::_patch_lock; +int MallocTracer::_patched_libs = 0; +bool MallocTracer::_initialized = false; +volatile bool MallocTracer::_running = false; + +static pthread_t _current_thread; +static volatile bool _nested_malloc = false; +static volatile bool _nested_posix_memalign = false; + +// Test if calloc() implementation calls malloc() +static void* nested_malloc_hook(size_t size) { + if (pthread_self() == _current_thread) { + _nested_malloc = true; + } + return _orig_malloc(size); +} + +// Test if posix_memalign() implementation calls aligned_alloc() +static void* nested_aligned_alloc_hook(size_t alignment, size_t size) { + if (pthread_self() == _current_thread) { + _nested_posix_memalign = true; + } + return _orig_aligned_alloc(alignment, size); +} + +// In some implementations, specifically on musl, calloc() calls malloc() internally, +// and posix_memalign() calls aligned_alloc(). Detect such cases to prevent double-accounting. +static void detectNestedMalloc() { + if (_orig_malloc != NULL && _orig_calloc != NULL) { + CodeCache* libc = Profiler::instance()->findLibraryByAddress((void*)_orig_calloc); + if (libc != NULL) { + libc->patchImport(im_malloc, (void*)nested_malloc_hook); + + _current_thread = pthread_self(); + free(_orig_calloc(1, 1)); + _current_thread = pthread_t(0); + + // Restore original malloc so libc doesn't carry the probe hook until patchLibraries() runs. + libc->patchImport(im_malloc, (void*)_orig_malloc); + } + } + + if (_orig_posix_memalign != NULL && _orig_aligned_alloc != NULL) { + CodeCache* libc = Profiler::instance()->findLibraryByAddress((void*)_orig_posix_memalign); + if (libc != NULL) { + libc->patchImport(im_aligned_alloc, (void*)nested_aligned_alloc_hook); + + _current_thread = pthread_self(); + void* pm_probe = NULL; + _orig_posix_memalign(&pm_probe, sizeof(void*), sizeof(void*)); + _current_thread = pthread_t(0); + if (pm_probe != NULL) _orig_free(pm_probe); + + // Restore original aligned_alloc so libc doesn't carry the probe hook. + libc->patchImport(im_aligned_alloc, (void*)_orig_aligned_alloc); + } + } +} + +// Call each intercepted function at least once to ensure its GOT entry is updated +static void resolveMallocSymbols() { + static volatile intptr_t sink; + + void* p0 = malloc(1); + void* p1 = realloc(p0, 2); + if (p1 == NULL) { + // realloc failed; p0 is still valid and must be freed explicitly. + free(p0); + } + void* p2 = calloc(1, 1); + void* p3 = aligned_alloc(1, 1); + void* p4 = NULL; + if (posix_memalign(&p4, sizeof(void*), sizeof(void*)) == 0) free(p4); + free(p3); + free(p2); + free(p1); + + sink = (intptr_t)p1 + (intptr_t)p2 + (intptr_t)p3 + (intptr_t)p4; +} + +void MallocTracer::initialize() { + CodeCache* lib = Profiler::instance()->findLibraryByAddress((void*)MallocTracer::initialize); + if (lib == NULL) { + return; + } + + resolveMallocSymbols(); + + SAVE_IMPORT(malloc); + SAVE_IMPORT(free); + SAVE_IMPORT(calloc); + SAVE_IMPORT(realloc); + SAVE_IMPORT(posix_memalign); + SAVE_IMPORT(aligned_alloc); + + detectNestedMalloc(); + + lib->mark( + [](const char* s) -> bool { + return strcmp(s, "malloc_hook") == 0 + || strcmp(s, "calloc_hook") == 0 + || strcmp(s, "realloc_hook") == 0 + || strcmp(s, "free_hook") == 0 + || strcmp(s, "posix_memalign_hook") == 0 + || strcmp(s, "aligned_alloc_hook") == 0; + }, + MARK_ASYNC_PROFILER); +} + +// To avoid complexity in hooking and tracking reentrancy, a TLS-based approach is not used. +// Reentrant allocation calls would result in double-accounting. However, this does not impact +// the leak detector, as it correctly tracks memory as freed regardless of how many times +// recordMalloc is called with the same address. +void MallocTracer::patchLibraries() { + MutexLocker ml(_patch_lock); + + const CodeCacheArray& native_libs = Libraries::instance()->native_libs(); + int native_lib_count = native_libs.count(); + + while (_patched_libs < native_lib_count) { + CodeCache* cc = native_libs[_patched_libs++]; + + UnloadProtection handle(cc); + if (!handle.isValid()) { + continue; + } + + if (_orig_malloc) cc->patchImport(im_malloc, (void*)malloc_hook); + if (_orig_realloc) cc->patchImport(im_realloc, (void*)realloc_hook); + if (_orig_free) cc->patchImport(im_free, (void*)free_hook); + if (_orig_aligned_alloc) cc->patchImport(im_aligned_alloc, (void*)aligned_alloc_hook); + + // Use dummy hooks for functions that call another tracked function internally, + // to prevent double-accounting. Dummy frames preserve the frame link to the caller. + if (_orig_calloc) { + cc->patchImport(im_calloc, _nested_malloc ? (void*)calloc_hook_dummy : (void*)calloc_hook); + } + if (_orig_posix_memalign) { + cc->patchImport(im_posix_memalign, _nested_posix_memalign ? (void*)posix_memalign_hook_dummy : (void*)posix_memalign_hook); + } + } +} + +u64 MallocTracer::nextPoissonInterval() { + u64 s = TSC::ticks(); + s ^= s >> 12; + s ^= s << 25; + s ^= s >> 27; + double u = (double)(s * 0x2545F4914F6CDD1DULL >> 11) / (double)(1ULL << 53); + if (u < 1e-18) u = 1e-18; + return (u64)((double)_interval * -log(u)); +} + +bool MallocTracer::shouldSample(size_t size) { + if (_interval <= 1) return true; + while (true) { + u64 prev = _bytes_until_sample; + if (size < prev) { + if (__sync_bool_compare_and_swap(&_bytes_until_sample, prev, prev - size)) + return false; + } else { + u64 next = nextPoissonInterval(); + if (__sync_bool_compare_and_swap(&_bytes_until_sample, prev, next)) + return true; + } + } +} + +void MallocTracer::updateConfiguration(u64 events, double time_coefficient) { + static PidController pid(TARGET_SAMPLES_PER_WINDOW, + 31, 511, 3, CONFIG_UPDATE_CHECK_PERIOD_SECS, 15); + double signal = pid.compute(events, time_coefficient); + int64_t new_interval = (int64_t)_interval - (int64_t)signal; + if (new_interval < (int64_t)_configured_interval) + new_interval = (int64_t)_configured_interval; + if (new_interval > (int64_t)(1ULL << 40)) + new_interval = (int64_t)(1ULL << 40); + _interval = (u64)new_interval; +} + +void MallocTracer::recordMalloc(void* address, size_t size) { + if (shouldSample(size)) { + u64 current_interval = _interval; + MallocEvent event; + event._start_time = TSC::ticks(); + event._address = (uintptr_t)address; + event._size = size; + event._weight = (float)(size == 0 || current_interval <= 1 + ? 1.0 + : 1.0 / (1.0 - exp(-(double)size / (double)current_interval))); + + Profiler::instance()->recordSample(NULL, size, OS::threadId(), BCI_NATIVE_MALLOC, 0, &event); + + u64 current_samples = __sync_add_and_fetch(&_sample_count, 1); + if ((current_samples % TARGET_SAMPLES_PER_WINDOW) == 0) { + u64 now = OS::nanotime(); + u64 prev_ts = __atomic_load_n(&_last_config_update_ts, __ATOMIC_RELAXED); + u64 time_diff = now - prev_ts; + u64 check_period_ns = (u64)CONFIG_UPDATE_CHECK_PERIOD_SECS * 1000000000ULL; + if (time_diff > check_period_ns) { + if (__atomic_compare_exchange(&_last_config_update_ts, &prev_ts, &now, + false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) { + __sync_fetch_and_add(&_sample_count, -current_samples); + updateConfiguration(current_samples, + (double)check_period_ns / time_diff); + } + } + } + } +} + +Error MallocTracer::start(Arguments& args) { + _configured_interval = args._nativemem > 0 ? args._nativemem : 0; + _interval = _configured_interval; + _bytes_until_sample = _configured_interval > 1 ? nextPoissonInterval() : 0; + _sample_count = 0; + __atomic_store_n(&_last_config_update_ts, OS::nanotime(), __ATOMIC_RELEASE); + + if (!_initialized) { + initialize(); + _initialized = true; + } + + if (_orig_malloc == NULL) { + return Error("Failed to resolve malloc symbols; native memory profiling unavailable"); + } + + // Patch first, then enable recording so hooks never run without valid _orig_* pointers. + patchLibraries(); + _running = true; + + return Error::OK; +} + +void MallocTracer::stop() { + // Ideally, we should reset original malloc entries, but it's not currently safe + // in the view of library unloading. Consider using dl_iterate_phdr. + _running = false; +} diff --git a/ddprof-lib/src/main/cpp/mallocTracer.h b/ddprof-lib/src/main/cpp/mallocTracer.h new file mode 100644 index 000000000..484cb4240 --- /dev/null +++ b/ddprof-lib/src/main/cpp/mallocTracer.h @@ -0,0 +1,58 @@ +/* + * Copyright The async-profiler authors + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _MALLOCTRACER_H +#define _MALLOCTRACER_H + +#include +#include "engine.h" +#include "event.h" +#include "mutex.h" + +class MallocTracer : public Engine { + private: + static volatile u64 _interval; + static volatile u64 _bytes_until_sample; + + static u64 _configured_interval; + static volatile u64 _sample_count; + static volatile u64 _last_config_update_ts; + static const int CONFIG_UPDATE_CHECK_PERIOD_SECS = 1; + static const int TARGET_SAMPLES_PER_WINDOW = 100; + + static Mutex _patch_lock; + static int _patched_libs; + static bool _initialized; + static volatile bool _running; + + static void initialize(); + static void patchLibraries(); + static u64 nextPoissonInterval(); + static bool shouldSample(size_t size); + static void updateConfiguration(u64 events, double time_coefficient); + + public: + const char* name() { + return "MallocTracer"; + } + + Error start(Arguments& args); + void stop(); + + static inline bool running() { + return _running; + } + + static inline void installHooks() { + if (running()) { + patchLibraries(); + } + } + + static void recordMalloc(void* address, size_t size); +}; + +#endif // _MALLOCTRACER_H diff --git a/ddprof-lib/src/main/cpp/nativeSocketSampler.cpp b/ddprof-lib/src/main/cpp/nativeSocketSampler.cpp new file mode 100644 index 000000000..9d8426019 --- /dev/null +++ b/ddprof-lib/src/main/cpp/nativeSocketSampler.cpp @@ -0,0 +1,326 @@ +/* + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "nativeSocketSampler.h" + +#if defined(__linux__) && defined(__GLIBC__) + +#include "common.h" +#include "flightRecorder.h" +#include "libraryPatcher.h" +#include "os.h" +#include "profiler.h" +#include "tsc.h" +#include "vmEntry.h" + +#include +#include +#include +#include +#include + +static thread_local PoissonSampler _send_sampler; +static thread_local PoissonSampler _recv_sampler; + +// Debug-only hook-fire counters, paired with TEST_LOG (common.h). Gated at +// compile time to keep release hot paths free of cross-thread atomic writes. +#ifdef DEBUG +static std::atomic _send_hook_calls{0}; +static std::atomic _recv_hook_calls{0}; +static std::atomic _write_hook_calls{0}; +static std::atomic _read_hook_calls{0}; +static std::atomic _record_accept_calls{0}; +static std::atomic _record_reject_calls{0}; +#endif + +NativeSocketSampler* const NativeSocketSampler::_instance = new NativeSocketSampler(); +NativeSocketSampler::send_fn NativeSocketSampler::_orig_send = nullptr; +NativeSocketSampler::recv_fn NativeSocketSampler::_orig_recv = nullptr; +NativeSocketSampler::write_fn NativeSocketSampler::_orig_write = nullptr; +NativeSocketSampler::read_fn NativeSocketSampler::_orig_read = nullptr; + +std::string NativeSocketSampler::resolveAddr(int fd) { + struct sockaddr_storage ss; + socklen_t len = sizeof(ss); + if (getpeername(fd, (struct sockaddr*)&ss, &len) != 0) { + return ""; + } + char host[INET6_ADDRSTRLEN]; + int port = 0; + if (ss.ss_family == AF_INET) { + struct sockaddr_in* s = (struct sockaddr_in*)&ss; + inet_ntop(AF_INET, &s->sin_addr, host, sizeof(host)); + port = ntohs(s->sin_port); + } else if (ss.ss_family == AF_INET6) { + struct sockaddr_in6* s = (struct sockaddr_in6*)&ss; + inet_ntop(AF_INET6, &s->sin6_addr, host, sizeof(host)); + port = ntohs(s->sin6_port); + } else { + return ""; + } + char buf[INET6_ADDRSTRLEN + 10]; + if (ss.ss_family == AF_INET6) { + snprintf(buf, sizeof(buf), "[%s]:%d", host, port); + } else { + snprintf(buf, sizeof(buf), "%s:%d", host, port); + } + return std::string(buf); +} + +bool NativeSocketSampler::isSocket(int fd) { + if ((unsigned)fd >= (unsigned)FD_TYPE_CACHE_SIZE) { + int so_type; + socklen_t solen = sizeof(so_type); + return getsockopt(fd, SOL_SOCKET, SO_TYPE, &so_type, &solen) == 0 + && so_type == SOCK_STREAM; + } + uint8_t cached = _fd_type_cache[fd].load(std::memory_order_relaxed); + if (cached == FD_TYPE_SOCKET) return true; + if (cached == FD_TYPE_NON_SOCKET) return false; + + int so_type; + socklen_t solen = sizeof(so_type); + bool tcp = getsockopt(fd, SOL_SOCKET, SO_TYPE, &so_type, &solen) == 0 + && so_type == SOCK_STREAM; + _fd_type_cache[fd].store(tcp ? FD_TYPE_SOCKET : FD_TYPE_NON_SOCKET, + std::memory_order_relaxed); + return tcp; +} + +bool NativeSocketSampler::shouldSample(u64 duration_ticks, int op, float &weight) { + PoissonSampler &sampler = (op == 0) ? _send_sampler : _recv_sampler; + return sampler.sample(duration_ticks, + (u64)_rate_limiter.interval(), + _rate_limiter.epoch(), + weight); +} + +void NativeSocketSampler::recordEvent(int fd, u64 t0, u64 t1, ssize_t bytes, u8 op) { + float weight = 0.0f; + bool sampled = shouldSample(t1 - t0, op, weight); + if (!sampled) { +#ifdef DEBUG + uint64_t n = _record_reject_calls.fetch_add(1, std::memory_order_relaxed); + if (n < 3 || (n & 0x3FF) == 0) { + TEST_LOG("NativeSocketSampler::recordEvent REJECT #%llu fd=%d op=%u bytes=%zd dur_ticks=%llu", + (unsigned long long)(n + 1), fd, (unsigned)op, bytes, + (unsigned long long)(t1 - t0)); + } +#endif + return; + } +#ifdef DEBUG + { + uint64_t n = _record_accept_calls.fetch_add(1, std::memory_order_relaxed); + if (n < 3 || (n & 0x3F) == 0) { + TEST_LOG("NativeSocketSampler::recordEvent ACCEPT #%llu fd=%d op=%u bytes=%zd dur_ticks=%llu weight=%f", + (unsigned long long)(n + 1), fd, (unsigned)op, bytes, + (unsigned long long)(t1 - t0), (double)weight); + } + } +#endif + + std::string addr; + { + std::lock_guard lock(_fd_cache_mutex); + auto it = _fd_cache.find(fd); + if (it != _fd_cache.end()) { + addr = it->second; + } + } + if (addr.empty()) { + addr = resolveAddr(fd); + std::lock_guard lock(_fd_cache_mutex); + if ((int)_fd_cache.size() < MAX_FD_CACHE) { + _fd_cache.emplace(fd, addr); + } + } + + NativeSocketEvent event; + event._start_time = t0; + event._end_time = t1; + event._operation = op; + strncpy(event._remote_addr, addr.c_str(), sizeof(event._remote_addr) - 1); + event._remote_addr[sizeof(event._remote_addr) - 1] = '\0'; + event._bytes = (u64)bytes; + event._weight = weight; + + // Pass NULL ucontext (mirrors MallocTracer): recordSample routes + // BCI_NATIVE_SOCKET to walkVM with no signal context, which walks native + // frames via DWARF/FP and falls back to JavaFrameAnchor for Java frames. + Profiler::instance()->recordSample(NULL, (u64)bytes, OS::threadId(), + BCI_NATIVE_SOCKET, 0, &event); + + _rate_limiter.recordFire(); +} + +ssize_t NativeSocketSampler::send_hook(int fd, const void* buf, size_t len, int flags) { +#ifdef DEBUG + { + uint64_t n = _send_hook_calls.fetch_add(1, std::memory_order_relaxed); + if (n < 3 || (n & 0x3FF) == 0) { + TEST_LOG("NativeSocketSampler::send_hook #%llu fd=%d len=%zu flags=0x%x", + (unsigned long long)(n + 1), fd, len, flags); + } + } +#endif + u64 t0 = TSC::ticks(); + ssize_t ret = _orig_send(fd, buf, len, flags); + u64 t1 = TSC::ticks(); + if (ret > 0) { + _instance->recordEvent(fd, t0, t1, ret, 0); + } + return ret; +} + +ssize_t NativeSocketSampler::recv_hook(int fd, void* buf, size_t len, int flags) { +#ifdef DEBUG + { + uint64_t n = _recv_hook_calls.fetch_add(1, std::memory_order_relaxed); + if (n < 3 || (n & 0x3FF) == 0) { + TEST_LOG("NativeSocketSampler::recv_hook #%llu fd=%d len=%zu flags=0x%x", + (unsigned long long)(n + 1), fd, len, flags); + } + } +#endif + u64 t0 = TSC::ticks(); + ssize_t ret = _orig_recv(fd, buf, len, flags); + u64 t1 = TSC::ticks(); + if (ret > 0) { + _instance->recordEvent(fd, t0, t1, ret, 1); + } + return ret; +} + +ssize_t NativeSocketSampler::write_hook(int fd, const void* buf, size_t len) { + NativeSocketSampler* self = _instance; + if (!self->isSocket(fd)) { +#ifdef DEBUG + uint64_t n = _write_hook_calls.fetch_add(1, std::memory_order_relaxed); + if (n < 3 || (n & 0x3FF) == 0) { + TEST_LOG("NativeSocketSampler::write_hook #%llu fd=%d len=%zu is_socket=0", + (unsigned long long)(n + 1), fd, len); + } +#endif + return _orig_write(fd, buf, len); + } +#ifdef DEBUG + { + uint64_t n = _write_hook_calls.fetch_add(1, std::memory_order_relaxed); + if (n < 3 || (n & 0x3FF) == 0) { + TEST_LOG("NativeSocketSampler::write_hook #%llu fd=%d len=%zu is_socket=1", + (unsigned long long)(n + 1), fd, len); + } + } +#endif + u64 t0 = TSC::ticks(); + ssize_t ret = _orig_write(fd, buf, len); + u64 t1 = TSC::ticks(); + if (ret > 0) { + self->recordEvent(fd, t0, t1, ret, 0); + } + return ret; +} + +ssize_t NativeSocketSampler::read_hook(int fd, void* buf, size_t len) { + NativeSocketSampler* self = _instance; + if (!self->isSocket(fd)) { +#ifdef DEBUG + uint64_t n = _read_hook_calls.fetch_add(1, std::memory_order_relaxed); + if (n < 3 || (n & 0x3FF) == 0) { + TEST_LOG("NativeSocketSampler::read_hook #%llu fd=%d len=%zu is_socket=0", + (unsigned long long)(n + 1), fd, len); + } +#endif + return _orig_read(fd, buf, len); + } +#ifdef DEBUG + { + uint64_t n = _read_hook_calls.fetch_add(1, std::memory_order_relaxed); + if (n < 3 || (n & 0x3FF) == 0) { + TEST_LOG("NativeSocketSampler::read_hook #%llu fd=%d len=%zu is_socket=1", + (unsigned long long)(n + 1), fd, len); + } + } +#endif + u64 t0 = TSC::ticks(); + ssize_t ret = _orig_read(fd, buf, len); + u64 t1 = TSC::ticks(); + if (ret > 0) { + self->recordEvent(fd, t0, t1, ret, 1); + } + return ret; +} + +Error NativeSocketSampler::check(Arguments &args) { + if (!args._nativesocket) { + return Error("natsock profiling not requested"); + } + return Error::OK; +} + +Error NativeSocketSampler::start(Arguments &args) { + // Initial sampling period: args._nativesocket_interval (ns) when > 0, + // otherwise 1 ms default. Converted to TSC ticks (time-weighted sampling). + long init_interval; + if (args._nativesocket_interval > 0) { + init_interval = (long)((u64)args._nativesocket_interval + * TSC::frequency() / 1000000000ULL); + } else { + init_interval = (long)(TSC::frequency() / 1000); + } + if (init_interval < 1) { + init_interval = DEFAULT_INTERVAL_TICKS; + } + // One limiter for all four hooks (send/write and recv/read): ~83 events/s (~5000/min) total. + // A large interval driven by heavy write traffic does not suppress long blocking reads: + // time-weighted sampling gives P = 1 - exp(-duration/interval) → 1 when duration >> interval, + // so slow calls self-select regardless of interval magnitude. Only short-duration calls + // (which carry no latency signal) are suppressed when the interval is large. + _rate_limiter.start(init_interval, TARGET_EVENTS_PER_SECOND, + PID_WINDOW_SECS, PID_P_GAIN, PID_I_GAIN, PID_D_GAIN, PID_CUTOFF_S); + // Reset fd-type cache so stale classifications from a prior session don't + // cause write/read hooks to skip newly opened socket fds. + for (int i = 0; i < FD_TYPE_CACHE_SIZE; i++) { + _fd_type_cache[i].store(FD_TYPE_UNKNOWN, std::memory_order_relaxed); + } +#ifdef DEBUG + _send_hook_calls.store(0, std::memory_order_relaxed); + _recv_hook_calls.store(0, std::memory_order_relaxed); + _write_hook_calls.store(0, std::memory_order_relaxed); + _read_hook_calls.store(0, std::memory_order_relaxed); + _record_accept_calls.store(0, std::memory_order_relaxed); + _record_reject_calls.store(0, std::memory_order_relaxed); + TEST_LOG("NativeSocketSampler::start interval_ticks=%ld tsc_freq=%llu", + init_interval, (unsigned long long)TSC::frequency()); +#endif + LibraryPatcher::patch_socket_functions(); + return Error::OK; +} + +void NativeSocketSampler::stop() { +#ifdef DEBUG + TEST_LOG("NativeSocketSampler::stop summary send=%llu recv=%llu write=%llu read=%llu accept=%llu reject=%llu", + (unsigned long long)_send_hook_calls.load(std::memory_order_relaxed), + (unsigned long long)_recv_hook_calls.load(std::memory_order_relaxed), + (unsigned long long)_write_hook_calls.load(std::memory_order_relaxed), + (unsigned long long)_read_hook_calls.load(std::memory_order_relaxed), + (unsigned long long)_record_accept_calls.load(std::memory_order_relaxed), + (unsigned long long)_record_reject_calls.load(std::memory_order_relaxed)); +#endif + LibraryPatcher::unpatch_socket_functions(); + clearFdCache(); +} + +void NativeSocketSampler::clearFdCache() { + std::lock_guard lock(_fd_cache_mutex); + _fd_cache.clear(); +} + +#else // !(__linux__ && __GLIBC__) + +NativeSocketSampler* const NativeSocketSampler::_instance = new NativeSocketSampler(); + +#endif // __linux__ && __GLIBC__ diff --git a/ddprof-lib/src/main/cpp/nativeSocketSampler.h b/ddprof-lib/src/main/cpp/nativeSocketSampler.h new file mode 100644 index 000000000..e42114c01 --- /dev/null +++ b/ddprof-lib/src/main/cpp/nativeSocketSampler.h @@ -0,0 +1,162 @@ +/* + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _NATIVESOCKETSAMPLER_H +#define _NATIVESOCKETSAMPLER_H + +#include "arch.h" +#include "arguments.h" +#include "engine.h" +#include "event.h" + +#if defined(__linux__) && defined(__GLIBC__) + +#include "poissonSampler.h" +#include "rateLimiter.h" +#include +#include +#include +#include + +class LibraryPatcher; + +// Synchronisation strategy +// ------------------------- +// Hook functions (send_hook / recv_hook / write_hook / read_hook) run on the +// calling Java thread, NOT in a signal handler. Therefore malloc and locking +// are safe inside hooks. +// +// fd-to-addr cache : guarded by _fd_cache_mutex (std::mutex). +// _fd_type_cache : std::atomic array, lock-free. Entries: +// 0 = unknown, 1 = TCP socket, 2 = non-TCP. +// Written once per fd; stale after fd reuse but the +// worst outcome is a single missed event. +// _rate_limiter : RateLimiter — owns std::atomic interval, epoch, and +// event count. PID update races are resolved by CAS +// inside RateLimiter::maybeUpdateInterval(). +// Sampling state : thread_local PoissonSampler (in nativeSocketSampler.cpp). +// No cross-thread contention; each thread maintains its +// own independent Poisson process. The per-second PID +// window observes the aggregate fire count via the shared +// atomic inside RateLimiter. +// Hook install/remove : guarded by the profiler's main state lock (MutexLocker +// in Profiler::start / Profiler::stop). No deadlock +// risk because hook bodies do NOT acquire the profiler +// signal lock. + +class NativeSocketSampler : public Engine { + friend class LibraryPatcher; + +public: + // Typedefs for libc send/recv/write/read signatures. + typedef ssize_t (*send_fn)(int, const void*, size_t, int); + typedef ssize_t (*recv_fn)(int, void*, size_t, int); + typedef ssize_t (*write_fn)(int, const void*, size_t); + typedef ssize_t (*read_fn)(int, void*, size_t); + + static NativeSocketSampler* instance() { return _instance; } + + Error check(Arguments &args) override; + Error start(Arguments &args) override; + void stop() override; + + // Clears the fd-to-address cache and resets the fd-type cache. + // Called only from stop(); intentionally NOT called on JFR chunk boundaries. + void clearFdCache(); + + // PLT hooks installed by LibraryPatcher::patch_socket_functions(). + static ssize_t send_hook(int fd, const void* buf, size_t len, int flags); + static ssize_t recv_hook(int fd, void* buf, size_t len, int flags); + static ssize_t write_hook(int fd, const void* buf, size_t len); + static ssize_t read_hook(int fd, void* buf, size_t len); + + // Original libc function pointers; set by LibraryPatcher, restored on stop. + static send_fn _orig_send; + static recv_fn _orig_recv; + static write_fn _orig_write; + static read_fn _orig_read; + +private: + static NativeSocketSampler* const _instance; + + // Target aggregate event rate: ~83 events/s (~5000/min) across all four hooks + // (send/write and recv/read) combined. + static const int TARGET_EVENTS_PER_SECOND = 83; + static const int PID_WINDOW_SECS = 1; + + // PID controller gains. Tuned for a 1-second observation window targeting + // ~83 events/s. P=31 is proportional gain; I=511 accumulates steady-state + // error over the window; D=3 damps oscillation; cutoff=15s low-passes the + // derivative to suppress high-frequency noise. + static constexpr double PID_P_GAIN = 31.0; + static constexpr double PID_I_GAIN = 511.0; + static constexpr double PID_D_GAIN = 3.0; + static constexpr double PID_CUTOFF_S = 15.0; + + // Default sampling interval in TSC ticks (~1 ms). + // Initialised in start() once TSC::frequency() is available. + static const long DEFAULT_INTERVAL_TICKS = 1000000; // fallback; overwritten at start + + // Rate limiter: owns the PID controller, interval, epoch, and fire counter. + // NativeSocketSampler uses it directly (not via RateLimitedSampler) because + // it has two sampling channels (send + recv) that share one rate target but + // need independent per-thread PoissonSampler state. + RateLimiter _rate_limiter; + + // fd -> "ip:port" string cache. Bounded to MAX_FD_CACHE entries; no + // eviction is performed (entries for closed/reused fds are stale until + // the next stop(), but stale addresses are a known, accepted limitation). + static const int MAX_FD_CACHE = 65536; + std::unordered_map _fd_cache; + std::mutex _fd_cache_mutex; + + // fd-type cache for write/read hooks. Values: 0=unknown, 1=TCP socket, + // 2=non-TCP. Lock-free: one atomic byte per fd number. + // Fds outside [0, FD_TYPE_CACHE_SIZE) are probed on every call. + static const int FD_TYPE_CACHE_SIZE = 65536; + static const uint8_t FD_TYPE_UNKNOWN = 0; + static const uint8_t FD_TYPE_SOCKET = 1; + static const uint8_t FD_TYPE_NON_SOCKET = 2; + std::atomic _fd_type_cache[FD_TYPE_CACHE_SIZE]; + + NativeSocketSampler() = default; + + // Resolve the peer address for fd; returns empty string on failure. + std::string resolveAddr(int fd); + + // Returns true if fd is a connected TCP socket (SOCK_STREAM). + // Uses the fd-type cache; calls getsockopt on first encounter per fd. + bool isSocket(int fd); + + // Decide whether to sample and compute weight. + // Returns true if the call should be recorded; sets weight out-param. + // Implements per-thread Poisson-process sampling: each thread maintains its + // own Exp-distributed countdown; when it expires the event is sampled and a + // new countdown is drawn. weight = 1 / (1 - exp(-duration/interval)). + // duration_ticks: wall time of the I/O call in TSC ticks. + // op: 0 = send, 1 = recv. + bool shouldSample(u64 duration_ticks, int op, float &weight); + + // Common recording logic shared by all four hooks. + void recordEvent(int fd, u64 t0, u64 t1, ssize_t bytes, u8 op); +}; + +#else // !(__linux__ && __GLIBC__) + +class NativeSocketSampler : public Engine { +public: + static NativeSocketSampler* instance() { return _instance; } + Error check(Arguments &args) override { return Error::OK; } + Error start(Arguments &args) override { return Error::OK; } + void stop() override {} + void clearFdCache() {} +private: + static NativeSocketSampler* const _instance; + NativeSocketSampler() {} +}; + +#endif // __linux__ && __GLIBC__ + +#endif // _NATIVESOCKETSAMPLER_H diff --git a/ddprof-lib/src/main/cpp/poissonSampler.h b/ddprof-lib/src/main/cpp/poissonSampler.h new file mode 100644 index 000000000..30f2869f4 --- /dev/null +++ b/ddprof-lib/src/main/cpp/poissonSampler.h @@ -0,0 +1,220 @@ +/* + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _POISSONSAMPLER_H +#define _POISSONSAMPLER_H + +#include "arch.h" +#include + +/** + * @file poissonSampler.h + * + * Thread-local, dimension-agnostic Poisson-process sampler. + * + * ## Concept + * + * Many profiler engines need to sub-sample a high-frequency stream of + * measurements and produce an unbiased aggregate estimate from the surviving + * samples. The classic approach is a deterministic threshold counter (fire + * every N units), but that creates phase-locking artifacts and handles + * measurements that span multiple intervals poorly. + * + * PoissonSampler models the stream as a Poisson process: between consecutive + * sample points the gap is drawn independently from Exp(mean = @p interval). + * This guarantees memoryless inter-arrival times and correct behaviour for + * measurements of any size relative to the interval. + * + * ## Sampling decision + * + * The sampler maintains a monotonically growing accumulator @c _used and a + * Exp-distributed threshold @c _threshold. On each call: + * + * 1. @p value is added to @c _used. + * 2. If @c _used < @c _threshold: no sample, return false. + * 3. Otherwise: the threshold has been crossed. Advance @c _threshold by + * a fresh draw from Exp(@p interval) so the next gap is independent. + * Compute and return the weight (see below), return true. + * + * ## Weight formula and estimator invariant + * + * The probability that a Poisson process with rate 1/@p interval produces + * at least one event during an interval of length @p value is: + * + * P = 1 - exp(-value / interval) + * + * The inverse-transform weight is: + * + * weight = 1 / P = 1 / (1 - exp(-value / interval)) + * + * This satisfies the unbiasedness invariant for every measurement: + * + * E[weight * value | sampled] * P(sampled) = (1/P) * value * P = value + * + * Therefore sum(weight_i * value_i) over all sampled events is an unbiased + * estimator of the total accumulated dimension over the recording window, + * regardless of the distribution of individual measurement sizes: + * + * - value << interval → P ≈ value/interval, weight ≈ interval/value, + * weight * value ≈ interval (one interval per event) + * - value >> interval → P ≈ 1, weight ≈ 1, + * weight * value ≈ value (full measurement credited) + * + * ## Dimension agnosticism + * + * The class is generic over the accumulated dimension. The caller chooses + * what @p value represents; @p interval must be in the same units: + * + * - TSC ticks → latency / time-in-I/O profiling + * - Bytes → throughput / allocation profiling + * - Event count → frequency profiling (pass value = 1 per event) + * + * ## Thread safety and epoch-based reset + * + * Instances must be declared @c thread_local. All state is private to the + * owning thread; no locks or atomics are used in the hot path. + * + * To support profiler restart, the engine exposes a shared + * @c std::atomic epoch counter that it increments on @c start(). + * Each PoissonSampler caches the last seen epoch; when it differs the + * sampler reinitialises lazily on the next @c sample() call, re-seeding + * the PRNG and resetting the accumulator and threshold. + * + * ## Usage + * + * @code + * // Engine header: + * std::atomic _epoch{0}; // bumped in start() + * std::atomic _interval; // PID-controlled mean gap + * + * // Engine translation unit: + * static thread_local PoissonSampler _send_sampler; + * static thread_local PoissonSampler _recv_sampler; + * + * // In the hook / measurement path: + * float weight; + * if (_send_sampler.sample(value, (u64)_interval.load(), _epoch.load(), weight)) { + * // record event; sum(weight * value) estimates total accumulated value + * } + * @endcode + */ +class PoissonSampler { +public: + /** + * Decide whether to sample this measurement and compute its weight. + * + * @param value The measurement for this call, in the chosen unit. + * Must be in the same units as @p interval. + * A value of 0 is never sampled. + * @param interval Mean inter-sample gap in the same units as @p value. + * Controls the average number of events recorded per + * unit of accumulated @p value. Must be > 0. + * @param epoch_now Current profiler epoch from the owning engine's + * shared atomic. When it differs from the cached epoch + * the sampler resets all state before evaluating the + * current measurement. + * @param weight [out] Set on true return to 1/(1-exp(-value/interval)). + * Multiply by @p value to get this event's contribution + * to the total-accumulated-value estimate. + * @return true if this measurement should be recorded. + */ + bool sample(u64 value, u64 interval, u64 epoch_now, float &weight) { + if (value == 0 || interval == 0) { + return false; + } + if (_epoch != epoch_now) { + reset(interval, epoch_now); + } + _used += value; + if (_used < _threshold) { + return false; + } + // Threshold crossed: advance by a fresh Exp draw so the next + // inter-arrival gap is independent of all previous ones. + _threshold += nextExp(interval); + float p = 1.0f - expf(-(float)value / (float)interval); + weight = (p > 0.0f) ? 1.0f / p : 1.0f; + return true; + } + +private: + u64 _epoch{0}; // last seen profiler epoch; 0 = not yet initialised + u64 _used{0}; // accumulated value since the last threshold crossing + u64 _threshold{0}; // next Exp-distributed threshold to cross + u64 _rng{0}; // xorshift64 PRNG state; must never be 0 + + /** + * Reinitialise all state for a new profiling session. + * + * The PRNG is seeded from the instance's own address XOR'd with a + * multiple of the new epoch. Because @c thread_local instances live at + * fixed but thread-specific addresses, and because distinct samplers + * within the same thread occupy different addresses, each (thread, + * sampler, session) triple receives an independent random stream. + */ + void reset(u64 interval, u64 epoch_now) { + // Fibonacci hashing of epoch_now spreads low-entropy epoch values + // across the full 64-bit range before XOR-ing with the address. + _rng = (u64)(uintptr_t)this ^ (epoch_now * 0x9e3779b97f4a7c15ULL); + if (_rng == 0) _rng = 1; // xorshift64 must not start at 0 + _used = 0; + _threshold = nextExp(interval); + _epoch = epoch_now; + } + + /** + * Draw one sample from Exp(mean = @p interval). + * + * Uses xorshift64 to produce a uniform pseudo-random value, then applies + * the inverse CDF of the exponential distribution: + * + * X = -interval * ln(U), U ~ Uniform(0, 1] + * + * Adding 0.5 ULP before the float scaling centres the rounding and + * ensures U is never exactly 0 (which would produce +infinity). + * The magic constant 5.42101086e-20 ≈ 1/2^64 converts a u64 to [0, 1]. + * + * ### Why xorshift64 instead of a C++ standard generator? + * + * The C++ facility (std::mt19937, std::minstd_rand, …) is + * unsuitable here for several reasons: + * + * 1. **Hot-path overhead.** nextExp() is called only once per fired + * event (~83 times/second at the default rate), so raw throughput + * is not the primary concern. The concern is code-size and + * instruction-cache pressure: std::mt19937 carries ~2.5 KB of + * state and its generate step touches all of it. xorshift64 fits + * in a single 8-byte field already present in the struct. + * + * 2. **Seeding.** std::random_device — the canonical seed source — + * may block, throw, or return low-entropy values on some Linux + * configurations (e.g., early boot, containers without /dev/urandom + * entropy). Our seed (instance address XOR epoch hash) is always + * available, zero-cost, and produces independent streams per thread + * and per profiling session without any OS interaction. + * + * 3. **No allocation, no exceptions.** std::random_device and the + * distribution wrappers (std::uniform_real_distribution, etc.) may + * allocate and may throw. This code runs inside PLT hooks that + * intercept arbitrary application threads; allocation and exception + * handling in that context would be unsafe. + * + * 4. **Statistical sufficiency.** xorshift64 passes the Diehard and + * BigCrush test suites and is well-established for Monte Carlo + * sampling. The inverse-CDF transform amplifies non-uniformity + * only near U ≈ 0 (i.e., extremely large Exp draws), which + * correspond to very long inter-sample gaps — a rare tail that has + * negligible effect on aggregate estimates. + */ + u64 nextExp(u64 interval) { + _rng ^= _rng << 13; + _rng ^= _rng >> 7; + _rng ^= _rng << 17; + float u = ((float)_rng + 0.5f) * 5.42101086e-20f; + return (u64)(-(float)interval * logf(u)); + } +}; + +#endif // _POISSONSAMPLER_H diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index 18a6230f1..1968487db 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -7,6 +7,8 @@ #include #include "profiler.h" #include "asyncSampleMutex.h" +#include "mallocTracer.h" +#include "nativeSocketSampler.h" #include "context.h" #include "context_api.h" #include "guards.h" @@ -59,6 +61,7 @@ static void (*orig_segvHandler)(int signo, siginfo_t *siginfo, void *ucontext); static void (*orig_busHandler)(int signo, siginfo_t *siginfo, void *ucontext); static Engine noop_engine; +static MallocTracer malloc_tracer; static PerfEvents perf_events; static WallClockASGCT wall_asgct_engine; static J9WallClock j9_engine; @@ -545,10 +548,9 @@ void Profiler::recordSample(void *ucontext, u64 counter, int tid, num_frames += getNativeTrace(ucontext, native_stop, event_type, tid, &java_ctx, &truncated, lock_index); assert(num_frames >= 0); - + int max_remaining = _max_stack_depth - num_frames; if (max_remaining > 0) { - // Walk Java frames if we have room, but only for mixed mode or CPU/Wall events with cstack enabled. For async events, we want to avoid walking Java frames in the signal handler if possible, since it can lead to deadlocks. Instead, we'll try to get the Java trace asynchronously after the signal handler returns. StackWalkRequest request = {event_type, lock_index, ucontext, frames + num_frames, max_remaining, &java_ctx, &truncated}; num_frames += JVMSupport::walkJavaStack(request); } @@ -576,6 +578,7 @@ void Profiler::recordSample(void *ucontext, u64 counter, int tid, _locks[lock_index].unlock(); } + void Profiler::recordWallClockEpoch(int tid, WallClockEpochEvent *event) { u32 lock_index = getLockIndex(tid); if (!_locks[lock_index].tryLock() && @@ -684,6 +687,10 @@ void *Profiler::dlopen_hook(const char *filename, int flags) { Libraries::instance()->updateSymbols(false); // Patch sigaction in newly loaded libraries LibraryPatcher::patch_sigaction(); + MallocTracer::installHooks(); + // Patch sigaction in newly loaded libraries + LibraryPatcher::patch_sigaction(); + LibraryPatcher::install_socket_hooks(); // Extract build-ids for newly loaded libraries if remote symbolication is enabled Profiler* profiler = instance(); if (profiler != nullptr && profiler->_remote_symbolication) { @@ -1040,7 +1047,9 @@ Error Profiler::start(Arguments &args, bool reset) { (args._cpu >= 0 ? EM_CPU : 0) | (args._wall >= 0 ? EM_WALL : 0) | (args._record_allocations || args._record_liveness || args._gc_generations ? EM_ALLOC - : 0); + : 0) | + (args._nativemem >= 0 ? EM_NATIVEMEM : 0) | + (args._nativesocket ? EM_NATIVESOCKET : 0); if (_event_mask == 0) { return Error("No profiling events specified"); @@ -1136,8 +1145,9 @@ Error Profiler::start(Arguments &args, bool reset) { Log::warn("Branch stack is supported only with PMU events"); } else if (_cstack == CSTACK_VM) { if (!VMStructs::hasStackStructs()) { - return Error( - "VMStructs stack walking is not supported on this JVM/platform"); + _cstack = CSTACK_DEFAULT; + Log::error("VMStructs stack walking is not supported on this JVM/platform, " + "defaulting to frame pointer unwinding. Native malloc stack traces will be unavailable."); } } @@ -1197,6 +1207,24 @@ Error Profiler::start(Arguments &args, bool reset) { } } } + if (_event_mask & EM_NATIVEMEM) { + error = malloc_tracer.start(args); + if (error) { + Log::warn("%s", error.message()); + error = Error::OK; // recoverable + } else { + activated |= EM_NATIVEMEM; + } + } + if (_event_mask & EM_NATIVESOCKET) { + error = NativeSocketSampler::instance()->start(args); + if (error) { + Log::warn("%s", error.message()); + error = Error::OK; // recoverable + } else { + activated |= EM_NATIVESOCKET; + } + } if (activated) { switchThreadEvents(JVMTI_ENABLE); @@ -1235,6 +1263,10 @@ Error Profiler::stop() { if (_event_mask & EM_ALLOC) _alloc_engine->stop(); + if (_event_mask & EM_NATIVEMEM) + malloc_tracer.stop(); + if (_event_mask & EM_NATIVESOCKET) + NativeSocketSampler::instance()->stop(); if (_event_mask & EM_WALL) _wall_engine->stop(); if (_event_mask & EM_CPU) @@ -1283,6 +1315,12 @@ Error Profiler::check(Arguments &args) { _alloc_engine = selectAllocEngine(args); error = _alloc_engine->check(args); } + if (!error && args._nativemem >= 0) { + error = malloc_tracer.check(args); + } + if (!error && args._nativesocket) { + error = NativeSocketSampler::instance()->check(args); + } if (!error) { if (args._cstack == CSTACK_DWARF && !DWARF_SUPPORTED) { return Error("DWARF unwinding is not supported on this platform"); diff --git a/ddprof-lib/src/main/cpp/rateLimiter.h b/ddprof-lib/src/main/cpp/rateLimiter.h new file mode 100644 index 000000000..88cc02596 --- /dev/null +++ b/ddprof-lib/src/main/cpp/rateLimiter.h @@ -0,0 +1,113 @@ +/* + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _RATELIMITER_H +#define _RATELIMITER_H + +#include "arch.h" +#include "os.h" +#include "pidController.h" +#include + +/** + * Thread-safe rate limiter based on a PID controller. + * + * Maintains a shared target event rate by adjusting a sampling interval + * (in arbitrary units — TSC ticks, bytes, counts, …) via a PID feedback + * loop driven by the aggregate observed fire count across all threads. + * + * ## Typical use + * + * One RateLimiter instance is shared across threads. Per-thread sampling + * decisions are made by a companion sampler (e.g. PoissonSampler) that + * reads interval() and epoch() from this object. After each sampled event + * the thread calls recordFire() to feed back into the rate controller. + * + * ## Epoch-based lazy reset + * + * start() bumps an epoch counter. Per-thread samplers compare their cached + * epoch against epoch() on every sample call; a mismatch triggers a lazy + * reinitialisation, so no explicit iteration over threads is needed at start. + */ +class RateLimiter { +public: + RateLimiter() = default; + + /** + * Initialise for a new profiling session. + * + * @param init_interval_units Initial sampling interval in the chosen unit + * (e.g. TSC ticks for ~1 ms). Must be >= 1. + * @param target_per_second Target aggregate fire rate (events / second). + * @param pid_window_secs PID observation window in seconds. + * @param p_gain PID proportional gain. + * @param i_gain PID integral gain. + * @param d_gain PID derivative gain. + * @param cutoff_secs PID derivative low-pass cutoff in seconds. + */ + void start(long init_interval_units, + u64 target_per_second, + int pid_window_secs, + double p_gain, double i_gain, double d_gain, + double cutoff_secs) { + _interval.store(init_interval_units, std::memory_order_release); + _event_count.store(0, std::memory_order_relaxed); + _last_update_ns.store(OS::nanotime(), std::memory_order_release); + _epoch.fetch_add(1, std::memory_order_release); + _pid = PidController(target_per_second, p_gain, i_gain, d_gain, + pid_window_secs, cutoff_secs); + } + + /** Current sampling interval in the chosen unit. */ + long interval() const { + return _interval.load(std::memory_order_relaxed); + } + + /** Current epoch; bumped on every start(). */ + u64 epoch() const { + return _epoch.load(std::memory_order_relaxed); + } + + /** + * Record one sampled event and update the PID controller at most once + * per second. Safe to call from any thread concurrently. + */ + void recordFire() { + _event_count.fetch_add(1, std::memory_order_relaxed); + maybeUpdateInterval(); + } + +private: + static const u64 ONE_SECOND_NS = 1000000000ULL; + + std::atomic _interval{1}; + std::atomic _epoch{0}; + std::atomic _event_count{0}; + std::atomic _last_update_ns{0}; + PidController _pid{1, 1.0, 1.0, 1.0, 1, 1.0}; + + void maybeUpdateInterval() { + u64 now = OS::nanotime(); + u64 prev = _last_update_ns.load(std::memory_order_relaxed); + if (now - prev < ONE_SECOND_NS) { + return; + } + if (!_last_update_ns.compare_exchange_strong(prev, now, + std::memory_order_acq_rel, + std::memory_order_relaxed)) { + return; + } + long count = _event_count.exchange(0, std::memory_order_relaxed); + double signal = _pid.compute(static_cast(count), 1.0); + long new_interval = _interval.load(std::memory_order_relaxed) + - static_cast(signal); + if (new_interval < 1) { + new_interval = 1; + } + _interval.store(new_interval, std::memory_order_relaxed); + } +}; + +#endif // _RATELIMITER_H diff --git a/ddprof-lib/src/main/cpp/vmEntry.h b/ddprof-lib/src/main/cpp/vmEntry.h index 322436558..2a346a0f3 100644 --- a/ddprof-lib/src/main/cpp/vmEntry.h +++ b/ddprof-lib/src/main/cpp/vmEntry.h @@ -33,6 +33,8 @@ enum ASGCT_CallFrameType { BCI_THREAD_ID = -17, // method_id designates a thread BCI_ERROR = -18, // method_id is an error string BCI_NATIVE_FRAME_REMOTE = -19, // method_id points to RemoteFrameInfo for remote symbolication + BCI_NATIVE_MALLOC = -20, // native malloc/free sample (size stored in counter) + BCI_NATIVE_SOCKET = -21, // native socket I/O sample (bytes stored in counter) }; // See hotspot/src/share/vm/prims/forte.cpp diff --git a/ddprof-lib/src/test/cpp/nativeSocketSampler_ut.cpp b/ddprof-lib/src/test/cpp/nativeSocketSampler_ut.cpp new file mode 100644 index 000000000..fb2c17c75 --- /dev/null +++ b/ddprof-lib/src/test/cpp/nativeSocketSampler_ut.cpp @@ -0,0 +1,103 @@ +/* + * Copyright 2026 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#if defined(__linux__) && defined(__GLIBC__) + +#include "nativeSocketSampler.h" + +#include +#include + +// --------------------------------------------------------------------------- +// Stub tracking +// --------------------------------------------------------------------------- + +static std::atomic g_send_calls{0}; +static std::atomic g_recv_calls{0}; +static std::atomic g_send_ret{-1}; +static std::atomic g_recv_ret{-1}; + +static ssize_t stub_send(int /*fd*/, const void* /*buf*/, size_t /*len*/, int /*flags*/) { + g_send_calls++; + return g_send_ret.load(); +} + +static ssize_t stub_recv(int /*fd*/, void* /*buf*/, size_t /*len*/, int /*flags*/) { + g_recv_calls++; + return g_recv_ret.load(); +} + +// --------------------------------------------------------------------------- +// Test fixture — installs stubs as the "original" function pointers so the +// hooks invoke them without needing GOT patching or a running JVM. +// --------------------------------------------------------------------------- + +class NativeSocketSamplerHookTest : public ::testing::Test { +protected: + NativeSocketSampler::send_fn _saved_send; + NativeSocketSampler::recv_fn _saved_recv; + + void SetUp() override { + _saved_send = NativeSocketSampler::_orig_send; + _saved_recv = NativeSocketSampler::_orig_recv; + NativeSocketSampler::_orig_send = stub_send; + NativeSocketSampler::_orig_recv = stub_recv; + g_send_calls = 0; + g_recv_calls = 0; + } + + void TearDown() override { + NativeSocketSampler::_orig_send = _saved_send; + NativeSocketSampler::_orig_recv = _saved_recv; + } +}; + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +/** + * Verifies that send_hook forwards the call to _orig_send and returns its + * return value when the original function indicates failure (ret <= 0 so the + * sampling path — which needs a JVM — is not entered). + */ +TEST_F(NativeSocketSamplerHookTest, SendHookCallsOrigSendAndReturnsValue) { + g_send_ret = -1; // error path avoids the JVM-dependent sampling code + char buf[16] = {}; + + ssize_t ret = NativeSocketSampler::send_hook(0, buf, sizeof(buf), 0); + + EXPECT_EQ(g_send_calls.load(), 1) << "send_hook must call _orig_send exactly once"; + EXPECT_EQ(ret, -1) << "send_hook must propagate the return value from _orig_send"; +} + +/** + * Verifies that recv_hook forwards the call to _orig_recv and returns its + * return value (same guard: ret <= 0 skips the JVM path). + */ +TEST_F(NativeSocketSamplerHookTest, RecvHookCallsOrigRecvAndReturnsValue) { + g_recv_ret = 0; + char buf[16] = {}; + + ssize_t ret = NativeSocketSampler::recv_hook(0, buf, sizeof(buf), 0); + + EXPECT_EQ(g_recv_calls.load(), 1) << "recv_hook must call _orig_recv exactly once"; + EXPECT_EQ(ret, 0) << "recv_hook must propagate the return value from _orig_recv"; +} + +#endif // __linux__ && __GLIBC__ diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/NativeSocketOverheadBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/NativeSocketOverheadBenchmark.java new file mode 100644 index 000000000..b3b787f34 --- /dev/null +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/NativeSocketOverheadBenchmark.java @@ -0,0 +1,173 @@ +/* + * Copyright 2026, Datadog, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datadoghq.profiler.stresstest.scenarios.throughput; + +import com.datadoghq.profiler.JavaProfiler; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Measures overhead of the nativesocket PLT write/read hooks on: + *
    + *
  • fileWrite — write() to a regular file. After the first call the + * fd-type cache classifies it as non-socket (one atomic load per subsequent + * call). This is the worst-case overhead scenario for code that does heavy + * file I/O with nativesocket enabled. + *
  • socketWrite — write() to a TCP socket (blocking, PlainSocket). + *
  • nioSocketWrite — write() via NIO SocketChannel (JDK11+ path). + *
+ * + *

Compare {@code profilerActive=false} vs {@code profilerActive=true} to + * quantify the hook overhead. Revert if fileWrite throughput degrades > 5%. + * + *

The profiler uses time-weighted (duration-based) inverse-transform + * sampling: {@code P(sample) = 1 - exp(-duration_ticks / interval_ticks)}. + * Slow I/O calls are sampled more often; fast calls are down-sampled. + * + *

+ *   ./gradlew :ddprof-stresstest:jmh -PjmhInclude="NativeSocketOverheadBenchmark"
+ * 
+ */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(value = 1, warmups = 0) +@Warmup(iterations = 3, time = 2) +@Measurement(iterations = 5, time = 3) +@State(Scope.Thread) +public class NativeSocketOverheadBenchmark { + + private static final int CHUNK = 4096; + + @Param({"false", "true"}) + public boolean profilerActive; + + private Path tmpFile; + private OutputStream fileOut; + + private ServerSocket server; + private Socket client; + private Socket serverConn; + private OutputStream sockOut; + private Thread serverAcceptor; + + private ServerSocket nioServer; + private SocketChannel nioClient; + private ByteBuffer nioBuf; + + private final byte[] buf = new byte[CHUNK]; + + @Setup(Level.Trial) + public void setup() throws Exception { + if (profilerActive) { + JavaProfiler profiler = JavaProfiler.getInstance(); + Path jfr = Files.createTempFile("nativesocket-bench", ".jfr"); + profiler.execute("start,nativesocket,jfr,file=" + jfr.toAbsolutePath()); + } + + // File I/O: regular file, will be classified non-socket after first write + tmpFile = Files.createTempFile("nativesocket-bench-file", ".bin"); + fileOut = Files.newOutputStream(tmpFile); + + // Blocking TCP socket (PlainSocket / JDK 8 send path) + server = new ServerSocket(0); + serverAcceptor = new Thread(() -> { + try { + serverConn = server.accept(); + // drain so the client write buffer never fills + InputStream drain = serverConn.getInputStream(); + byte[] dbuf = new byte[CHUNK]; + while (drain.read(dbuf) != -1) { /* drain */ } + } catch (IOException ignored) {} + }); + serverAcceptor.setDaemon(true); + serverAcceptor.start(); + client = new Socket("127.0.0.1", server.getLocalPort()); + sockOut = client.getOutputStream(); + + // NIO SocketChannel (write(2) path used by JDK 11+) + nioServer = new ServerSocket(0); + Thread nioAcceptor = new Thread(() -> { + try { + Socket ac = nioServer.accept(); + // drain + InputStream drain = ac.getInputStream(); + byte[] dbuf = new byte[CHUNK]; + while (drain.read(dbuf) != -1) { /* drain */ } + } catch (IOException ignored) {} + }); + nioAcceptor.setDaemon(true); + nioAcceptor.start(); + nioClient = SocketChannel.open(new InetSocketAddress("127.0.0.1", nioServer.getLocalPort())); + nioBuf = ByteBuffer.allocate(CHUNK); + } + + @TearDown(Level.Trial) + public void teardown() throws Exception { + if (profilerActive) { + JavaProfiler.getInstance().execute("stop"); + } + fileOut.close(); + Files.deleteIfExists(tmpFile); + client.close(); + if (serverConn != null) serverConn.close(); + server.close(); + nioClient.close(); + nioServer.close(); + } + + /** write() to a regular file — measures fd-type-cache overhead on non-socket fds. */ + @Benchmark + public void fileWrite() throws IOException { + fileOut.write(buf); + } + + /** write() to a blocking TCP socket — the socket sampling path. */ + @Benchmark + public void socketWrite() throws IOException { + sockOut.write(buf); + } + + /** SocketChannel.write() — the NIO path used by JDK 11+ java.net.Socket. */ + @Benchmark + public long nioSocketWrite() throws IOException { + nioBuf.clear(); + nioBuf.put(buf); + nioBuf.flip(); + return nioClient.write(nioBuf); + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/AbstractProfilerTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/AbstractProfilerTest.java index 53137f8df..de75c2f06 100644 --- a/ddprof-test/src/test/java/com/datadoghq/profiler/AbstractProfilerTest.java +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/AbstractProfilerTest.java @@ -286,8 +286,15 @@ private void checkConfig() { long wallIntervalMillis = wallIntervalAccessor.getMember(item).longValueIn(MILLISECOND); if (!Platform.isJ9() && Platform.isJavaVersionAtLeast(11)) { // fixme J9 engine have weird defaults and need fixing - assertEquals(cpuInterval.toMillis(), cpuIntervalMillis); - assertEquals(wallInterval.toMillis(), wallIntervalMillis); + // Only assert intervals that were explicitly requested in the profiler + // command; engines not requested carry default intervals that do not + // match the (absent) command value. + if (cpuInterval.toMillis() > 0) { + assertEquals(cpuInterval.toMillis(), cpuIntervalMillis); + } + if (wallInterval.toMillis() > 0) { + assertEquals(wallInterval.toMillis(), wallIntervalMillis); + } } } } diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativemem/NativememProfilerTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativemem/NativememProfilerTest.java new file mode 100644 index 000000000..e05f009f0 --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativemem/NativememProfilerTest.java @@ -0,0 +1,97 @@ +/* + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ +package com.datadoghq.profiler.nativemem; + +import com.datadoghq.profiler.CStackAwareAbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import com.datadoghq.profiler.junit.CStack; +import com.datadoghq.profiler.junit.RetryTest; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.params.provider.ValueSource; +import org.openjdk.jmc.common.item.IAttribute; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; +import org.openjdk.jmc.common.unit.IQuantity; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.openjdk.jmc.common.item.Attribute.attr; +import static org.openjdk.jmc.common.unit.UnitLookup.ADDRESS; + +/** + * Smoke tests for native memory (malloc) profiling. + * + *

Runs with {@code cstack=vm} and {@code cstack=vmx}. In {@code vmx} mode the + * stack trace is expected to contain {@code allocateDirect} because the mixed-mode + * walker captures the Java call chain that triggered the native allocation. + */ +public class NativememProfilerTest extends CStackAwareAbstractProfilerTest { + + private static final IAttribute MALLOC_ADDRESS = attr("address", "address", "", ADDRESS); + + public NativememProfilerTest(@CStack String cstack) { + super(cstack); + } + + @Override + protected String getProfilerCommand() { + return "nativemem=0"; // sample every allocation + } + + @Override + protected boolean isPlatformSupported() { + return Platform.isLinux() && !Platform.isJ9() && !Platform.isZing(); + } + + @RetryTest(3) + @TestTemplate + @ValueSource(strings = {"vm", "vmx"}) + public void shouldRecordMallocSamples() throws InterruptedException { + // GOT patching conflicts with ASan/TSan interceptors: both replace malloc/free + // symbols, causing undefined behavior or crashes when hooks chain into each other. + Assumptions.assumeFalse(isAsan() || isTsan()); + + List buffers = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + buffers.add(ByteBuffer.allocateDirect(1024)); + } + + stopProfiler(); + + IItemCollection events = verifyEvents("profiler.Malloc"); + boolean foundMinSize = false; + for (IItemIterable items : events) { + IMemberAccessor sizeAccessor = SIZE.getAccessor(items.getType()); + IMemberAccessor addrAccessor = MALLOC_ADDRESS.getAccessor(items.getType()); + if (sizeAccessor == null) { + continue; + } + for (IItem item : items) { + IQuantity size = sizeAccessor.getMember(item); + assertTrue(size == null || size.longValue() > 0, "allocation size must be positive"); + if (size != null && size.longValue() >= 1024) { + foundMinSize = true; + } + if (addrAccessor != null) { + IQuantity addr = addrAccessor.getMember(item); + assertTrue(addr == null || addr.longValue() != 0, "malloc address must not be zero"); + } + } + } + assertTrue(foundMinSize, "expected at least one malloc event with size >= 1024 bytes"); + + // Both vm and vmx capture the Java call chain; allocateDirect must appear in stack traces + verifyStackTraces("profiler.Malloc", "allocateDirect"); + + buffers.clear(); // keep alive until after stop + } + +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketBytesAccuracyTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketBytesAccuracyTest.java new file mode 100644 index 000000000..5af4b835c --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketBytesAccuracyTest.java @@ -0,0 +1,136 @@ +package com.datadoghq.profiler.nativesocket; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junitpioneer.jupiter.RetryingTest; +import org.openjdk.jmc.common.item.Attribute; +import org.openjdk.jmc.common.item.IAttribute; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; +import org.openjdk.jmc.common.unit.IQuantity; +import org.openjdk.jmc.common.unit.UnitLookup; + +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Verifies that time-weighted sampling produces a statistically reasonable + * estimate of total I/O time. With time-weighted inverse-transform sampling + * the invariant is: + *

+ *   E[ sum(weight * duration) ] = total_io_time
+ * 
+ * + * We verify that {@code sum(weight * duration_ns)} is positive and within a + * generous 100x tolerance of the actual test duration, confirming that the + * weight field reflects duration-based probability rather than a degenerate + * value. + */ +public class NativeSocketBytesAccuracyTest extends AbstractProfilerTest { + + private static final IAttribute DURATION_ATTR = + Attribute.attr("duration", "duration", "Duration", UnitLookup.TIMESPAN); + private static final IAttribute WEIGHT_ATTR = + Attribute.attr("weight", "weight", "weight", UnitLookup.NUMBER); + + @Override + protected boolean isPlatformSupported() { + return Platform.isLinux() && !Platform.isMusl(); + } + + @Override + protected String getProfilerCommand() { + // 100us period keeps sampling probability high on short localhost I/O. + return "natsock=100us"; + } + + @RetryingTest(5) + public void timeWeightedEstimateIsWithinReasonableBounds() throws Exception { + Assumptions.assumeTrue(Platform.isLinux(), "nativesocket tracking is Linux-only"); + + int payloadSize = 256 * 1024; + int iterations = 100; + + long wallStart = System.nanoTime(); + doTcpSend(payloadSize, iterations); + long wallEnd = System.nanoTime(); + long wallNs = wallEnd - wallStart; + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent"); + assertTrue(events.hasItems(), "No NativeSocketEvent events found"); + + double scaledDurationNs = 0.0; + long sendEventCount = 0; + for (IItemIterable items : events) { + IMemberAccessor opAccessor = OPERATION.getAccessor(items.getType()); + IMemberAccessor durationAccessor = DURATION_ATTR.getAccessor(items.getType()); + IMemberAccessor weightAccessor = WEIGHT_ATTR.getAccessor(items.getType()); + if (opAccessor == null || durationAccessor == null || weightAccessor == null) continue; + for (IItem item : items) { + if ("SEND".equals(opAccessor.getMember(item))) { + IQuantity dur = durationAccessor.getMember(item); + IQuantity weight = weightAccessor.getMember(item); + if (dur != null && weight != null) { + double durationNs = dur.doubleValueIn(UnitLookup.NANOSECOND); + scaledDurationNs += durationNs * weight.doubleValue(); + sendEventCount++; + } + } + } + } + + System.out.println("Wall time of transfers: " + wallNs + " ns"); + System.out.println("Scaled I/O time (sum of duration*weight): " + scaledDurationNs + " ns"); + System.out.println("SEND event count: " + sendEventCount); + + assertTrue(sendEventCount > 0, "No SEND events recorded"); + assertTrue(scaledDurationNs > 0.0, "sum(weight * duration) must be positive"); + + // Generous 100x tolerance: scaled estimate must not exceed 100x wall time. + // Lower bound is not enforced because very short I/O calls may all fall + // below the sampling threshold in a brief recording window. + assertTrue(scaledDurationNs <= wallNs * 100.0, + String.format("sum(weight * duration) = %.0f ns is implausibly large (wall=%d ns)", + scaledDurationNs, wallNs)); + } + + private void doTcpSend(int payloadSize, int iterations) throws Exception { + byte[] payload = new byte[payloadSize]; + try (ServerSocket server = new ServerSocket(0)) { + int port = server.getLocalPort(); + Thread serverThread = new Thread(() -> { + for (int iter = 0; iter < iterations; iter++) { + try (Socket conn = server.accept()) { + InputStream in = conn.getInputStream(); + byte[] buf = new byte[payloadSize]; + int read = 0; + while (read < payloadSize) { + int n = in.read(buf, read, payloadSize - read); + if (n < 0) break; + read += n; + } + } catch (IOException ignored) {} + } + }); + serverThread.setDaemon(true); + serverThread.start(); + + for (int iter = 0; iter < iterations; iter++) { + try (Socket client = new Socket("127.0.0.1", port)) { + client.getOutputStream().write(payload); + client.getOutputStream().flush(); + } + } + serverThread.join(5000); + } + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketDisabledTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketDisabledTest.java new file mode 100644 index 000000000..8e74fcb02 --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketDisabledTest.java @@ -0,0 +1,86 @@ +package com.datadoghq.profiler.nativesocket; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junitpioneer.jupiter.RetryingTest; +import org.openjdk.jmc.common.item.IItemCollection; + +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** + * Verifies that NativeSocketEvent events are absent when the 'nativesocket' + * profiler argument is not specified. + */ +public class NativeSocketDisabledTest extends AbstractProfilerTest { + + @Override + protected boolean isPlatformSupported() { + return Platform.isLinux(); + } + + @Override + protected String getProfilerCommand() { + // cpu-only profiling, no nativesocket + return "cpu=10ms"; + } + + @RetryingTest(3) + public void noSocketEventsWithoutFeatureEnabled() throws Exception { + Assumptions.assumeTrue(Platform.isLinux(), "nativesocket tracking is Linux-only"); + + doTcpTransfer(64 * 1024, 8); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent", false); + assertFalse(events.hasItems(), + "NativeSocketEvent events must not appear when nativesocket argument is absent"); + } + + private void doTcpTransfer(int payloadSize, int iterations) throws Exception { + byte[] payload = new byte[payloadSize]; + try (ServerSocket server = new ServerSocket(0)) { + int port = server.getLocalPort(); + Thread serverThread = new Thread(() -> { + for (int iter = 0; iter < iterations; iter++) { + try (Socket conn = server.accept()) { + InputStream in = conn.getInputStream(); + byte[] buf = new byte[payloadSize]; + int read = 0; + while (read < payloadSize) { + int n = in.read(buf, read, payloadSize - read); + if (n < 0) break; + read += n; + } + conn.getOutputStream().write(buf, 0, read); + conn.getOutputStream().flush(); + } catch (IOException ignored) {} + } + }); + serverThread.setDaemon(true); + serverThread.start(); + + for (int iter = 0; iter < iterations; iter++) { + try (Socket client = new Socket("127.0.0.1", port)) { + client.getOutputStream().write(payload); + client.getOutputStream().flush(); + byte[] resp = new byte[payloadSize]; + InputStream in = client.getInputStream(); + int read = 0; + while (read < payloadSize) { + int n = in.read(resp, read, payloadSize - read); + if (n < 0) break; + read += n; + } + } + } + serverThread.join(5000); + } + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketEnabledTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketEnabledTest.java new file mode 100644 index 000000000..5f0f3e7f6 --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketEnabledTest.java @@ -0,0 +1,28 @@ +package com.datadoghq.profiler.nativesocket; + +import org.junit.jupiter.api.Assumptions; +import org.junitpioneer.jupiter.RetryingTest; +import org.openjdk.jmc.common.item.IItemCollection; + +import com.datadoghq.profiler.Platform; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Verifies that the 'nativesocket' profiler argument enables socket I/O tracking + * and that NativeSocketEvent JFR events are produced. + */ +public class NativeSocketEnabledTest extends NativeSocketTestBase { + + @RetryingTest(3) + public void socketEventsProducedWhenFeatureEnabled() throws Exception { + Assumptions.assumeTrue(Platform.isLinux(), "nativesocket tracking is Linux-only"); + + doTcpTransfer(4096, 128); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent"); + assertTrue(events.hasItems(), "Expected NativeSocketEvent events to be present in JFR recording"); + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketEventFieldsTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketEventFieldsTest.java new file mode 100644 index 000000000..0b5964bf3 --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketEventFieldsTest.java @@ -0,0 +1,106 @@ +package com.datadoghq.profiler.nativesocket; + +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junitpioneer.jupiter.RetryingTest; +import org.openjdk.jmc.common.IMCThread; +import org.openjdk.jmc.common.IMCStackTrace; +import org.openjdk.jmc.common.item.Attribute; +import org.openjdk.jmc.common.item.IAttribute; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; +import org.openjdk.jmc.common.unit.IQuantity; +import org.openjdk.jmc.common.unit.UnitLookup; +import org.openjdk.jmc.flightrecorder.JfrAttributes; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Verifies that each NativeSocketEvent carries all required fields with valid values: + * eventThread, stackTrace, duration, operation (SEND/RECV), remoteAddress (ip:port), + * bytesTransferred (> 0), weight (> 0). + */ +public class NativeSocketEventFieldsTest extends NativeSocketTestBase { + + private static final IAttribute REMOTE_ADDRESS = + Attribute.attr("remoteAddress", "remoteAddress", "Remote address", UnitLookup.PLAIN_TEXT); + private static final IAttribute BYTES_TRANSFERRED = + Attribute.attr("bytesTransferred", "bytesTransferred", "Bytes transferred", UnitLookup.MEMORY); + private static final IAttribute DURATION = + Attribute.attr("duration", "duration", "Duration", UnitLookup.TIMESPAN); + + @RetryingTest(3) + public void allRequiredFieldsPresentAndValid() throws Exception { + Assumptions.assumeTrue(Platform.isLinux(), "nativesocket tracking is Linux-only"); + + doTcpTransfer(4096, 128); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent"); + assertTrue(events.hasItems(), "No NativeSocketEvent events found"); + + boolean foundSend = false; + boolean foundRecv = false; + + for (IItemIterable items : events) { + IMemberAccessor operationAccessor = + OPERATION.getAccessor(items.getType()); + IMemberAccessor remoteAddressAccessor = + REMOTE_ADDRESS.getAccessor(items.getType()); + IMemberAccessor bytesAccessor = + BYTES_TRANSFERRED.getAccessor(items.getType()); + IMemberAccessor weightAccessor = + WEIGHT.getAccessor(items.getType()); + IMemberAccessor durationAccessor = + DURATION.getAccessor(items.getType()); + IMemberAccessor threadAccessor = + JfrAttributes.EVENT_THREAD.getAccessor(items.getType()); + IMemberAccessor stackTraceAccessor = + STACK_TRACE.getAccessor(items.getType()); + + assertNotNull(operationAccessor, "operation field accessor must be present"); + assertNotNull(remoteAddressAccessor, "remoteAddress field accessor must be present"); + assertNotNull(bytesAccessor, "bytesTransferred field accessor must be present"); + assertNotNull(weightAccessor, "weight field accessor must be present"); + assertNotNull(durationAccessor, "duration field accessor must be present"); + assertNotNull(threadAccessor, "eventThread field accessor must be present"); + assertNotNull(stackTraceAccessor, "stackTrace field accessor must be present"); + + for (IItem item : items) { + String operation = operationAccessor.getMember(item); + assertNotNull(operation, "operation must not be null"); + assertTrue(operation.equals("SEND") || operation.equals("RECV"), + "operation must be SEND or RECV, got: " + operation); + if ("SEND".equals(operation)) foundSend = true; + if ("RECV".equals(operation)) foundRecv = true; + + String remoteAddress = remoteAddressAccessor.getMember(item); + assertNotNull(remoteAddress, "remoteAddress must not be null"); + assertTrue(remoteAddress.contains(":"), + "remoteAddress must be in ip:port format, got: " + remoteAddress); + + IQuantity bytes = bytesAccessor.getMember(item); + assertNotNull(bytes, "bytesTransferred must not be null"); + assertTrue(bytes.longValue() > 0, + "bytesTransferred must be > 0, got: " + bytes); + + IQuantity weight = weightAccessor.getMember(item); + assertNotNull(weight, "weight must not be null"); + assertTrue(weight.doubleValue() > 0.0, + "weight must be > 0, got: " + weight); + + IQuantity duration = durationAccessor.getMember(item); + assertNotNull(duration, "duration must not be null"); + + IMCThread thread = threadAccessor.getMember(item); + assertNotNull(thread, "eventThread must not be null"); + } + } + + assertTrue(foundSend, "Expected at least one SEND event"); + assertTrue(foundRecv, "Expected at least one RECV event"); + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketEventThreadTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketEventThreadTest.java new file mode 100644 index 000000000..0aa1d45c4 --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketEventThreadTest.java @@ -0,0 +1,104 @@ +package com.datadoghq.profiler.nativesocket; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junitpioneer.jupiter.RetryingTest; +import org.openjdk.jmc.common.IMCThread; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; +import org.openjdk.jmc.flightrecorder.JfrAttributes; + +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Verifies that the eventThread field on NativeSocketEvent is populated and + * names a non-empty thread name, indicating the I/O was attributed to the + * calling thread. + */ +public class NativeSocketEventThreadTest extends AbstractProfilerTest { + + @Override + protected boolean isPlatformSupported() { + return Platform.isLinux() && !Platform.isMusl(); + } + + @Override + protected String getProfilerCommand() { + // 100us period keeps sampling probability high on short localhost I/O. + return "natsock=100us"; + } + + @RetryingTest(3) + public void eventThreadIsPopulated() throws Exception { + Assumptions.assumeTrue(Platform.isLinux(), "nativesocket tracking is Linux-only"); + + doTcpTransfer(64 * 1024, 16); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent"); + assertTrue(events.hasItems(), "No NativeSocketEvent events found"); + + for (IItemIterable items : events) { + IMemberAccessor threadAccessor = + JfrAttributes.EVENT_THREAD.getAccessor(items.getType()); + assertNotNull(threadAccessor, "eventThread accessor must be present"); + for (IItem item : items) { + IMCThread thread = threadAccessor.getMember(item); + assertNotNull(thread, "eventThread must not be null"); + String name = thread.getThreadName(); + assertNotNull(name, "thread name must not be null"); + assertFalse(name.isEmpty(), "thread name must not be empty"); + } + } + } + + private void doTcpTransfer(int payloadSize, int iterations) throws Exception { + byte[] payload = new byte[payloadSize]; + try (ServerSocket server = new ServerSocket(0)) { + int port = server.getLocalPort(); + Thread serverThread = new Thread(() -> { + for (int iter = 0; iter < iterations; iter++) { + try (Socket conn = server.accept()) { + InputStream in = conn.getInputStream(); + byte[] buf = new byte[payloadSize]; + int read = 0; + while (read < payloadSize) { + int n = in.read(buf, read, payloadSize - read); + if (n < 0) break; + read += n; + } + conn.getOutputStream().write(buf, 0, read); + conn.getOutputStream().flush(); + } catch (IOException ignored) {} + } + }); + serverThread.setDaemon(true); + serverThread.start(); + + for (int iter = 0; iter < iterations; iter++) { + try (Socket client = new Socket("127.0.0.1", port)) { + client.getOutputStream().write(payload); + client.getOutputStream().flush(); + byte[] resp = new byte[payloadSize]; + InputStream in = client.getInputStream(); + int read = 0; + while (read < payloadSize) { + int n = in.read(resp, read, payloadSize - read); + if (n < 0) break; + read += n; + } + } + } + serverThread.join(5000); + } + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketMacOsNoOpTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketMacOsNoOpTest.java new file mode 100644 index 000000000..c77b3e85c --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketMacOsNoOpTest.java @@ -0,0 +1,91 @@ +package com.datadoghq.profiler.nativesocket; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junitpioneer.jupiter.RetryingTest; +import org.openjdk.jmc.common.item.IItemCollection; + +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * On macOS the nativesocket feature is a no-op stub. + * Verifies that the profiler starts without error when 'nativesocket' is specified + * and that no NativeSocketEvent events appear in the recording. + */ +public class NativeSocketMacOsNoOpTest extends AbstractProfilerTest { + + @Override + protected boolean isPlatformSupported() { + return Platform.isMac(); + } + + @Override + protected String getProfilerCommand() { + return "natsock"; + } + + @RetryingTest(3) + public void noEventsOnMacOS() throws Exception { + Assumptions.assumeTrue(Platform.isMac(), "This test targets macOS no-op behaviour"); + + doTcpTransfer(32 * 1024, 8); + + stopProfiler(); + + // verifyEvents with failOnEmpty=false: must not throw even if empty + IItemCollection events = verifyEvents("datadog.NativeSocketEvent", false); + assertNotNull(events); + assertFalse(events.hasItems(), + "NativeSocketEvent must not be emitted on macOS (no-op stub)"); + } + + private void doTcpTransfer(int payloadSize, int iterations) throws Exception { + byte[] payload = new byte[payloadSize]; + try (ServerSocket server = new ServerSocket(0)) { + int port = server.getLocalPort(); + Thread serverThread = new Thread(() -> { + for (int iter = 0; iter < iterations; iter++) { + try (Socket conn = server.accept()) { + try { + InputStream in = conn.getInputStream(); + byte[] buf = new byte[payloadSize]; + int read = 0; + while (read < payloadSize) { + int n = in.read(buf, read, payloadSize - read); + if (n < 0) break; + read += n; + } + conn.getOutputStream().write(buf, 0, read); + conn.getOutputStream().flush(); + } catch (IOException ignored) {} + } catch (IOException ignored) {} + } + }); + serverThread.setDaemon(true); + serverThread.start(); + + for (int iter = 0; iter < iterations; iter++) { + try (Socket client = new Socket("127.0.0.1", port)) { + client.getOutputStream().write(payload); + client.getOutputStream().flush(); + byte[] resp = new byte[payloadSize]; + InputStream in = client.getInputStream(); + int read = 0; + while (read < payloadSize) { + int n = in.read(resp, read, payloadSize - read); + if (n < 0) break; + read += n; + } + } + } + serverThread.join(5000); + } + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketRateLimitTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketRateLimitTest.java new file mode 100644 index 000000000..841f883ac --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketRateLimitTest.java @@ -0,0 +1,165 @@ +/* + * Copyright 2026 Datadog, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.datadoghq.profiler.nativesocket; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junitpioneer.jupiter.RetryingTest; +import org.openjdk.jmc.common.item.Attribute; +import org.openjdk.jmc.common.item.IAttribute; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; +import org.openjdk.jmc.common.unit.IQuantity; +import org.openjdk.jmc.common.unit.UnitLookup; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Verifies that the sampler is active: when a large number of I/O operations are + * performed over a single persistent connection, only a fraction are recorded + * (events << operations), and the weight field is > 1 on at least some events, + * reflecting statistical significance. + * + * The target rate is ~5000 events/min; for a 10-second window that is ~833 events. + * We generate far more byte volume than that and assert the event count is + * substantially less than the operation count, and that at least one event + * carries weight > 1. + * + * A persistent connection is reused across all iterations to avoid the TCP + * handshake overhead that would make the test slow and unreliable on CI. + */ +public class NativeSocketRateLimitTest extends AbstractProfilerTest { + + private static final IAttribute WEIGHT_ATTR = + Attribute.attr("weight", "weight", "weight", UnitLookup.NUMBER); + + @Override + protected boolean isPlatformSupported() { + return Platform.isLinux() && !Platform.isMusl(); + } + + @Override + protected String getProfilerCommand() { + return "natsock"; + } + + @RetryingTest(3) + public void eventCountIsSubstantiallyLessThanOperationCount() throws Exception { + Assumptions.assumeTrue(Platform.isLinux(), "nativesocket tracking is Linux-only"); + + // Generate a high volume of byte-sized writes over a single connection. + // ~5000 iterations * 4 KB = 20 MB total — far above the ~5000-event/min rate limit. + int operations = doHighRateTcpTransfer(4096, 5000); + System.out.println("Total TCP operations performed: " + operations); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent"); + assertTrue(events.hasItems(), "No NativeSocketEvent events found"); + + long eventCount = 0; + boolean foundWeightAboveOne = false; + + for (IItemIterable items : events) { + IMemberAccessor weightAccessor = + WEIGHT_ATTR.getAccessor(items.getType()); + for (IItem item : items) { + eventCount++; + if (weightAccessor != null) { + IQuantity w = weightAccessor.getMember(item); + if (w != null && w.doubleValue() > 1.0) { + foundWeightAboveOne = true; + } + } + } + } + + System.out.println("Recorded NativeSocketEvent count: " + eventCount); + + // Recorded events must be fewer than raw operations (subsampling is active) + assertTrue(eventCount < operations, + "Event count (" + eventCount + ") should be less than operation count (" + operations + ")"); + + // At least some events must have weight > 1, indicating byte-weighted sampling + assertTrue(foundWeightAboveOne, + "Expected at least one event with weight > 1 (byte-weighted inverse-transform sampling)"); + } + + /** + * Sends {@code iterations} writes of {@code payloadSize} bytes over a single + * persistent TCP connection and reads the echo back. Reusing the connection + * avoids per-iteration TCP handshake overhead that would otherwise make the + * workload too slow to reliably hit the rate limit within the recording window. + * + * @return total number of individual send/recv calls performed (4 per iteration) + */ + private int doHighRateTcpTransfer(int payloadSize, int iterations) throws Exception { + byte[] payload = new byte[payloadSize]; + + try (ServerSocket server = new ServerSocket(0)) { + int port = server.getLocalPort(); + + Thread serverThread = new Thread(() -> { + try (Socket conn = server.accept()) { + InputStream in = conn.getInputStream(); + OutputStream out = conn.getOutputStream(); + byte[] buf = new byte[payloadSize]; + for (int iter = 0; iter < iterations; iter++) { + int read = 0; + while (read < payloadSize) { + int n = in.read(buf, read, payloadSize - read); + if (n < 0) return; + read += n; + } + out.write(buf, 0, payloadSize); + out.flush(); + } + } catch (IOException ignored) {} + }); + serverThread.setDaemon(true); + serverThread.start(); + + try (Socket client = new Socket("127.0.0.1", port)) { + OutputStream out = client.getOutputStream(); + InputStream in = client.getInputStream(); + byte[] resp = new byte[payloadSize]; + for (int iter = 0; iter < iterations; iter++) { + out.write(payload); + out.flush(); + int read = 0; + while (read < payloadSize) { + int n = in.read(resp, read, payloadSize - read); + if (n < 0) break; + read += n; + } + } + } + serverThread.join(10000); + } + // Each iteration: 1 send + 1 recv on client; 1 recv + 1 send on server + return iterations * 4; + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketRemoteAddressTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketRemoteAddressTest.java new file mode 100644 index 000000000..04b46e6e8 --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketRemoteAddressTest.java @@ -0,0 +1,119 @@ +package com.datadoghq.profiler.nativesocket; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junitpioneer.jupiter.RetryingTest; +import org.openjdk.jmc.common.item.Attribute; +import org.openjdk.jmc.common.item.IAttribute; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; +import org.openjdk.jmc.common.unit.UnitLookup; + +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Verifies that the remoteAddress field in NativeSocketEvent is a non-empty + * string of the form ":" matching the known server endpoint. + */ +public class NativeSocketRemoteAddressTest extends AbstractProfilerTest { + + private static final IAttribute REMOTE_ADDRESS = + Attribute.attr("remoteAddress", "remoteAddress", "Remote address", UnitLookup.PLAIN_TEXT); + + @Override + protected boolean isPlatformSupported() { + return Platform.isLinux() && !Platform.isMusl(); + } + + @Override + protected String getProfilerCommand() { + // 100us period keeps sampling probability high enough that the 16 + // short-lived connections reliably produce at least one event whose + // remoteAddress points at the known server port. + return "natsock=100us"; + } + + @RetryingTest(3) + public void remoteAddressIsIpColonPort() throws Exception { + Assumptions.assumeTrue(Platform.isLinux(), "nativesocket tracking is Linux-only"); + + int serverPort = doTcpTransfer(64 * 1024, 16); + System.out.println("Server port: " + serverPort); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent"); + assertTrue(events.hasItems(), "No NativeSocketEvent events found"); + + boolean foundMatchingAddress = false; + for (IItemIterable items : events) { + IMemberAccessor addrAccessor = + REMOTE_ADDRESS.getAccessor(items.getType()); + assertNotNull(addrAccessor, "remoteAddress accessor must exist"); + for (IItem item : items) { + String addr = addrAccessor.getMember(item); + assertNotNull(addr, "remoteAddress must not be null"); + assertFalse(addr.isEmpty(), "remoteAddress must not be empty"); + // Must match ip:port pattern + assertTrue(addr.matches("^[\\d.]+:\\d+$") || addr.matches("^\\[.*\\]:\\d+$"), + "remoteAddress '" + addr + "' does not match expected ip:port format"); + if (addr.endsWith(":" + serverPort)) { + foundMatchingAddress = true; + } + } + } + assertTrue(foundMatchingAddress, + "Expected at least one event with remoteAddress pointing to server port " + serverPort); + } + + /** Returns the server's bound port. */ + private int doTcpTransfer(int payloadSize, int iterations) throws Exception { + byte[] payload = new byte[payloadSize]; + try (ServerSocket server = new ServerSocket(0)) { + int port = server.getLocalPort(); + Thread serverThread = new Thread(() -> { + for (int iter = 0; iter < iterations; iter++) { + try (Socket conn = server.accept()) { + InputStream in = conn.getInputStream(); + byte[] buf = new byte[payloadSize]; + int read = 0; + while (read < payloadSize) { + int n = in.read(buf, read, payloadSize - read); + if (n < 0) break; + read += n; + } + conn.getOutputStream().write(buf, 0, read); + conn.getOutputStream().flush(); + } catch (IOException ignored) {} + } + }); + serverThread.setDaemon(true); + serverThread.start(); + + for (int iter = 0; iter < iterations; iter++) { + try (Socket client = new Socket("127.0.0.1", port)) { + client.getOutputStream().write(payload); + client.getOutputStream().flush(); + byte[] resp = new byte[payloadSize]; + InputStream in = client.getInputStream(); + int read = 0; + while (read < payloadSize) { + int n = in.read(resp, read, payloadSize - read); + if (n < 0) break; + read += n; + } + } + } + serverThread.join(5000); + return port; + } + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketSendRecvSeparateTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketSendRecvSeparateTest.java new file mode 100644 index 000000000..b9195758b --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketSendRecvSeparateTest.java @@ -0,0 +1,104 @@ +package com.datadoghq.profiler.nativesocket; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junitpioneer.jupiter.RetryingTest; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; + +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Verifies that SEND and RECV operations are tracked as independent events. + * Generates a workload where only one side performs writes so that events + * can be attributed unambiguously to the sending or receiving thread. + */ +public class NativeSocketSendRecvSeparateTest extends AbstractProfilerTest { + + @Override + protected boolean isPlatformSupported() { + return Platform.isLinux() && !Platform.isMusl(); + } + + @Override + protected String getProfilerCommand() { + // 100us initial sampling period: a single 256KB localhost write + // completes in ~100-500us giving P ~= 0.6-1.0 per call, so both + // SEND and RECV directions are sampled reliably over 32 iterations. + return "natsock=100us"; + } + + @RetryingTest(3) + public void sendAndRecvTrackedWithSeparateCounts() throws Exception { + Assumptions.assumeTrue(Platform.isLinux(), "nativesocket tracking is Linux-only"); + + // Large volume of data to ensure sampler captures both directions + doUnidirectionalTransfer(256 * 1024, 32); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent"); + assertTrue(events.hasItems(), "No NativeSocketEvent events found"); + + long sendCount = 0; + long recvCount = 0; + + for (IItemIterable items : events) { + IMemberAccessor opAccessor = OPERATION.getAccessor(items.getType()); + assertNotNull(opAccessor); + for (IItem item : items) { + String op = opAccessor.getMember(item); + if ("SEND".equals(op)) sendCount++; + else if ("RECV".equals(op)) recvCount++; + } + } + + System.out.println("SEND events: " + sendCount + ", RECV events: " + recvCount); + assertTrue(sendCount > 0, "Expected at least one SEND event, got 0"); + assertTrue(recvCount > 0, "Expected at least one RECV event, got 0"); + } + + private void doUnidirectionalTransfer(int payloadSize, int iterations) throws Exception { + byte[] payload = new byte[payloadSize]; + CountDownLatch serverReady = new CountDownLatch(1); + + try (ServerSocket server = new ServerSocket(0)) { + int port = server.getLocalPort(); + Thread serverThread = new Thread(() -> { + serverReady.countDown(); + for (int iter = 0; iter < iterations; iter++) { + try (Socket conn = server.accept()) { + InputStream in = conn.getInputStream(); + byte[] buf = new byte[payloadSize]; + int read = 0; + while (read < payloadSize) { + int n = in.read(buf, read, payloadSize - read); + if (n < 0) break; + read += n; + } + } catch (IOException ignored) {} + } + }); + serverThread.setDaemon(true); + serverThread.start(); + serverReady.await(); + + for (int iter = 0; iter < iterations; iter++) { + try (Socket client = new Socket("127.0.0.1", port)) { + client.getOutputStream().write(payload); + client.getOutputStream().flush(); + } + } + serverThread.join(5000); + } + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketStackTraceTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketStackTraceTest.java new file mode 100644 index 000000000..c9d779e75 --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketStackTraceTest.java @@ -0,0 +1,107 @@ +package com.datadoghq.profiler.nativesocket; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junitpioneer.jupiter.RetryingTest; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; +import org.openjdk.jmc.flightrecorder.jdk.JdkAttributes; + +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Verifies that NativeSocketEvent events carry non-empty stack traces, + * and that a recognizable Java call site from the test workload appears + * in at least one event's stack trace. + */ +public class NativeSocketStackTraceTest extends AbstractProfilerTest { + + @Override + protected boolean isPlatformSupported() { + return Platform.isLinux() && !Platform.isMusl(); + } + + @Override + protected String getProfilerCommand() { + // 100us period keeps sampling probability high on short localhost I/O. + return "natsock=100us"; + } + + @RetryingTest(3) + public void stackTraceIsCapturedForSocketEvents() throws Exception { + Assumptions.assumeTrue(Platform.isLinux(), "nativesocket tracking is Linux-only"); + + doTcpTransfer(64 * 1024, 20); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent"); + assertTrue(events.hasItems(), "No NativeSocketEvent events found"); + + boolean foundNonEmptyStackTrace = false; + for (IItemIterable items : events) { + IMemberAccessor stackTraceAccessor = + JdkAttributes.STACK_TRACE_STRING.getAccessor(items.getType()); + if (stackTraceAccessor == null) continue; + for (IItem item : items) { + String st = stackTraceAccessor.getMember(item); + if (st != null && !st.isEmpty()) { + foundNonEmptyStackTrace = true; + break; + } + } + if (foundNonEmptyStackTrace) break; + } + assertTrue(foundNonEmptyStackTrace, "Expected at least one NativeSocketEvent with a non-empty stack trace"); + } + + // Named method so it can appear as a recognizable frame + private void doTcpTransfer(int payloadSize, int iterations) throws Exception { + byte[] payload = new byte[payloadSize]; + try (ServerSocket server = new ServerSocket(0)) { + int port = server.getLocalPort(); + Thread serverThread = new Thread(() -> { + for (int iter = 0; iter < iterations; iter++) { + try (Socket conn = server.accept()) { + InputStream in = conn.getInputStream(); + byte[] buf = new byte[payloadSize]; + int read = 0; + while (read < payloadSize) { + int n = in.read(buf, read, payloadSize - read); + if (n < 0) break; + read += n; + } + conn.getOutputStream().write(buf, 0, read); + conn.getOutputStream().flush(); + } catch (IOException ignored) {} + } + }); + serverThread.setDaemon(true); + serverThread.start(); + + for (int iter = 0; iter < iterations; iter++) { + try (Socket client = new Socket("127.0.0.1", port)) { + client.getOutputStream().write(payload); + client.getOutputStream().flush(); + byte[] resp = new byte[payloadSize]; + InputStream in = client.getInputStream(); + int read = 0; + while (read < payloadSize) { + int n = in.read(resp, read, payloadSize - read); + if (n < 0) break; + read += n; + } + } + } + serverThread.join(5000); + } + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketTestBase.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketTestBase.java new file mode 100644 index 000000000..ae826db20 --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketTestBase.java @@ -0,0 +1,78 @@ +package com.datadoghq.profiler.nativesocket; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; + +/** Base class for native-socket profiler tests that need a persistent TCP workload. */ +abstract class NativeSocketTestBase extends AbstractProfilerTest { + + @Override + protected boolean isPlatformSupported() { + return Platform.isLinux() && !Platform.isMusl(); + } + + @Override + protected String getProfilerCommand() { + // 100us initial period keeps P high enough that fast localhost I/O + // reliably produces events across small test workloads. + return "natsock=100us"; + } + + /** + * Sends {@code iterations} writes of {@code payloadSize} bytes over a single + * persistent TCP connection and reads the echo back. Reusing the connection + * fills the TCP send buffer after ~32 iterations (at 4 KB/write, 128 KB OS buffer), + * causing subsequent writes to block for >1 ms and driving the Poisson sampler + * probability close to 1.0 for those calls. + */ + protected void doTcpTransfer(int payloadSize, int iterations) throws Exception { + byte[] payload = new byte[payloadSize]; + for (int i = 0; i < payloadSize; i++) payload[i] = (byte) (i & 0xFF); + + try (ServerSocket server = new ServerSocket(0)) { + int port = server.getLocalPort(); + Thread serverThread = new Thread(() -> { + try (Socket conn = server.accept()) { + InputStream in = conn.getInputStream(); + OutputStream out = conn.getOutputStream(); + byte[] buf = new byte[payloadSize]; + for (int iter = 0; iter < iterations; iter++) { + int read = 0; + while (read < payloadSize) { + int n = in.read(buf, read, payloadSize - read); + if (n < 0) return; + read += n; + } + out.write(buf, 0, payloadSize); + out.flush(); + } + } catch (IOException ignored) {} + }); + serverThread.setDaemon(true); + serverThread.start(); + + try (Socket client = new Socket("127.0.0.1", port)) { + OutputStream out = client.getOutputStream(); + InputStream in = client.getInputStream(); + byte[] resp = new byte[payloadSize]; + for (int iter = 0; iter < iterations; iter++) { + out.write(payload); + out.flush(); + int read = 0; + while (read < payloadSize) { + int n = in.read(resp, read, payloadSize - read); + if (n < 0) break; + read += n; + } + } + } + serverThread.join(10000); + } + } +} diff --git a/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketUdpExcludedTest.java b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketUdpExcludedTest.java new file mode 100644 index 000000000..b0b3a48bc --- /dev/null +++ b/ddprof-test/src/test/java/com/datadoghq/profiler/nativesocket/NativeSocketUdpExcludedTest.java @@ -0,0 +1,84 @@ +package com.datadoghq.profiler.nativesocket; + +import com.datadoghq.profiler.AbstractProfilerTest; +import com.datadoghq.profiler.Platform; +import org.junit.jupiter.api.Assumptions; +import org.junitpioneer.jupiter.RetryingTest; +import org.openjdk.jmc.common.item.Attribute; +import org.openjdk.jmc.common.item.IAttribute; +import org.openjdk.jmc.common.item.IItem; +import org.openjdk.jmc.common.item.IItemCollection; +import org.openjdk.jmc.common.item.IItemIterable; +import org.openjdk.jmc.common.item.IMemberAccessor; +import org.openjdk.jmc.common.unit.UnitLookup; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Verifies that UDP (DatagramSocket / sendto / recvfrom) transfers do NOT + * produce NativeSocketEvent events. Only TCP blocking send/recv are in scope. + * + * This test performs only UDP transfers and expects zero NativeSocketEvent + * events in the recording. + */ +public class NativeSocketUdpExcludedTest extends AbstractProfilerTest { + + @Override + protected boolean isPlatformSupported() { + return Platform.isLinux() && !Platform.isMusl(); + } + + @Override + protected String getProfilerCommand() { + // 100us period strengthens the negative assertion: any UDP traffic + // that accidentally leaks through the TCP filter would be far more + // likely to produce a sampled event at this tighter interval. + return "natsock=100us"; + } + + @RetryingTest(3) + public void udpTransfersProduceNoSocketEvents() throws Exception { + Assumptions.assumeTrue(Platform.isLinux(), "nativesocket tracking is Linux-only"); + + doUdpTransfer(1024, 500); + + stopProfiler(); + + IItemCollection events = verifyEvents("datadog.NativeSocketEvent", false); + assertNotNull(events); + assertFalse(events.hasItems(), + "NativeSocketEvent must not be produced for UDP (sendto/recvfrom) transfers"); + } + + private void doUdpTransfer(int payloadSize, int iterations) throws Exception { + byte[] payload = new byte[payloadSize]; + InetAddress loopback = InetAddress.getLoopbackAddress(); + + try (DatagramSocket server = new DatagramSocket(0, loopback)) { + int port = server.getLocalPort(); + Thread serverThread = new Thread(() -> { + byte[] buf = new byte[payloadSize]; + DatagramPacket pkt = new DatagramPacket(buf, buf.length); + for (int iter = 0; iter < iterations; iter++) { + try { + server.receive(pkt); + } catch (Exception ignored) {} + } + }); + serverThread.setDaemon(true); + serverThread.start(); + + try (DatagramSocket client = new DatagramSocket()) { + DatagramPacket pkt = new DatagramPacket(payload, payload.length, loopback, port); + for (int iter = 0; iter < iterations; iter++) { + client.send(pkt); + } + } + serverThread.join(5000); + } + } +} diff --git a/doc/architecture/NativeMemoryProfiling.md b/doc/architecture/NativeMemoryProfiling.md new file mode 100644 index 000000000..6dcd9aa03 --- /dev/null +++ b/doc/architecture/NativeMemoryProfiling.md @@ -0,0 +1,293 @@ +# Native Memory Allocation Profiling + +## Overview + +The native memory profiler tracks heap allocations made through the C standard +library (`malloc`, `calloc`, `realloc`, `posix_memalign`, `aligned_alloc`). It +instruments these functions at the GOT (Global Offset Table) level so that every +intercepted call is accounted for without modifying application source code or +requiring a custom allocator. The `free` function is also hooked (to forward calls +correctly through the GOT) but free events are not recorded. + +Sampled allocation events carry a full Java + native stack trace and are emitted as +`profiler.Malloc` JFR events. + +The feature is activated by passing `nativemem=` to the profiler, where +`` is the byte-sampling interval (e.g. `nativemem=524288` samples roughly +one event per 512 KiB allocated). Passing `nativemem=0` records every allocation. + +--- + +## Component Map + +``` + Application code + │ malloc() / calloc() / realloc() / free() / … + ▼ + ┌─────────────┐ GOT patch ┌──────────────────────────┐ + │ libc / musl│ ◄────────── │ malloc_hook / free_hook │ mallocTracer.cpp + └─────────────┘ │ calloc_hook / … │ + └────────────┬─────────────┘ + │ recordMalloc + ▼ + ┌──────────────────────────┐ + │ MallocTracer:: │ mallocTracer.cpp/h + │ shouldSample() │ + │ recordSample() ──────► │ profiler.cpp + └────────────┬─────────────┘ + │ walkVM (CSTACK_VM) + ▼ + ┌──────────────────────────┐ + │ JFR buffer │ flightRecorder.cpp + │ profiler.Malloc │ + └──────────────────────────┘ +``` + +--- + +## GOT Patching + +The profiler redirects allocator calls by writing hook function addresses directly +into the importing library's GOT. This is cheaper than `LD_PRELOAD` (no process +restart) and works for libraries loaded at any time. + +### Import IDs + +`codeCache.h` defines an `ImportId` enum with one entry per hooked symbol: + +``` +im_malloc, im_calloc, im_realloc, im_free, im_posix_memalign, im_aligned_alloc +``` + +`CodeCache::patchImport(ImportId, void*)` walks the library's PLT/GOT and overwrites +the matching entry. + +### Hook signatures + +Each hook calls the saved original function first, then records the event: + +| Hook | Calls | Records | +|------|-------|---------| +| `malloc_hook(size)` | `_orig_malloc(size)` | `recordMalloc(ret, size)` if `ret != NULL && size != 0` | +| `calloc_hook(num, size)` | `_orig_calloc(num, size)` | `recordMalloc(ret, num*size)` if `ret != NULL` | +| `realloc_hook(addr, size)` | `_orig_realloc(addr, size)` | `recordMalloc(ret, size)` if `ret != NULL && size > 0` | +| `free_hook(addr)` | `_orig_free(addr)` | — (forwards only) | +| `posix_memalign_hook(…)` | `_orig_posix_memalign(…)` | `recordMalloc(*memptr, size)` if `ret == 0` | +| `aligned_alloc_hook(align, size)` | `_orig_aligned_alloc(align, size)` | `recordMalloc(ret, size)` if `ret != NULL` | + +--- + +## Initialization Sequence + +`MallocTracer::start()` (called once per profiler session) runs: + +1. **`resolveMallocSymbols()`** — calls each intercepted function at least once so + the profiler library's own PLT stubs are resolved by the dynamic linker. This + ensures that subsequent `SAVE_IMPORT` reads get the real libc function pointers + rather than the PLT resolver. + +2. **`SAVE_IMPORT(func)`** — reads the resolved GOT entry for each symbol from the + profiler library's own import table and stores it in the corresponding + `_orig_` static pointer. + +3. **`detectNestedMalloc()`** — probes whether the platform's `calloc` implementation + calls `malloc` internally (as musl does). If so, `calloc_hook` and + `posix_memalign_hook` are replaced with dummy variants (`calloc_hook_dummy`, + `posix_memalign_hook_dummy`) that forward to the originals without recording, to + prevent double-accounting. The dummy hooks preserve the caller frame pointer so + that the actual call site is not obscured. + +4. **`patchLibraries()`** — iterates over all currently loaded native libraries and + writes the hook addresses into each library's GOT, under `_patch_lock`. + `_patched_libs` is a monotonic counter so that already-patched libraries are + skipped on subsequent calls. + +`_initialized` is set to `true` after the first successful `initialize()` call. +`patchLibraries()` is called again on every subsequent `start()` to pick up any +libraries loaded between profiler sessions. + +--- + +## Dynamic Library Handling + +When the application calls `dlopen`, the profiler's `dlopen_hook` (installed as a +GOT hook for `dlopen`) calls `MallocTracer::installHooks()` after the library is +loaded: + +```cpp +// profiler.cpp +void* Profiler::dlopen_hook(const char* filename, int flags) { + void* result = dlopen(filename, flags); + if (result != NULL) { + Libraries::instance()->updateSymbols(false); + MallocTracer::installHooks(); + } + return result; +} +``` + +`installHooks()` calls `patchLibraries()` only if `_running` is `true`, so newly +loaded libraries are automatically hooked without requiring a profiler restart. + +--- + +## Sampling + +Allocation recording uses Poisson-interval sampling via `MallocTracer::shouldSample()`: + +```cpp +// mallocTracer.cpp — lock-free CAS loop with Poisson jitter +static bool shouldSample(size_t size) { + if (_interval <= 1) return true; // nativemem=0: record every allocation + while (true) { + u64 prev = _bytes_until_sample; + if (prev > size) { + if (__sync_bool_compare_and_swap(&_bytes_until_sample, prev, prev - size)) + return false; + } else { + if (__sync_bool_compare_and_swap(&_bytes_until_sample, prev, nextPoissonInterval())) + return true; + } + } +} +``` + +`_bytes_until_sample` is a shared volatile counter decremented by each allocation's +size. When exhausted, a new Poisson-distributed interval is generated via +`nextPoissonInterval()` (using `-interval * ln(uniform_random)`), providing random +jitter that avoids synchronization artifacts. Multiple threads compete via CAS so no +mutex is needed. + +A PID controller (`updateConfiguration()`) periodically adjusts `_interval` to +maintain approximately `TARGET_SAMPLES_PER_WINDOW` (100) samples per second. + +--- + +## Stack Trace Capture + +### Why `CSTACK_VM` is needed + +The malloc hooks execute on the calling thread with no signal context (`ucontext == +NULL`). Native stack unwinding via frame pointers or DWARF requires a signal context +as the starting point, so neither `CSTACK_DEFAULT` nor `CSTACK_FP` can produce +useful traces for malloc events. + +`CSTACK_VM` starts from `__builtin_return_address` for the initial frame and walks +native frames via DWARF. It then uses HotSpot's `JavaFrameAnchor` (lastJavaPC / +lastJavaSP / lastJavaFP) to transition from native to Java frames. This works +correctly from inside a malloc hook because the anchor is set whenever the JVM has +transitioned from Java to native. + +### Default stack mode + +`CSTACK_DEFAULT` is the initial default (`arguments.h`). At profiler start, +`profiler.cpp` promotes it to `CSTACK_VM` when VMStructs are available on the +platform. If VMStructs are not available, it stays at `CSTACK_DEFAULT`: + +```cpp +} else if (_cstack == CSTACK_VM) { + if (!VMStructs::hasStackStructs()) { + _cstack = CSTACK_DEFAULT; + Log::warn("VMStructs stack walking is not supported …"); + } +} +``` + +### Code path in `recordSample` + +```cpp +// profiler.cpp — stack walking for malloc events +if (event_type == BCI_NATIVE_MALLOC && _cstack >= CSTACK_VM) { + // walkVM walks native frames via DWARF/FP and, when native unwinding + // fails, falls back to the JavaFrameAnchor to reach Java frames. + int vm_frames = StackWalker::walkVM(ucontext /*NULL*/, frames + num_frames, + max_remaining, _features, + eventTypeFromBCI(event_type), + lock_index, &truncated); + num_frames += vm_frames; +} +``` + +`getNativeTrace` returns 0 immediately for `_cstack >= CSTACK_VM` (line 316), so +native frames from `walkFP`/`walkDwarf` are not collected — `walkVM` is the sole +source of both native and Java frames for malloc events. + +--- + +## JFR Event Format + +A single event type is defined in `jfrMetadata.cpp` under the +`Java Virtual Machine / Native Memory` category: + +### `profiler.Malloc` (`T_MALLOC`) + +| Field | Type | Description | +|-------|------|-------------| +| `startTime` | `long` (ticks) | TSC timestamp of the allocation | +| `eventThread` | thread ref | Thread that performed the allocation | +| `stackTrace` | stack trace ref | Call stack at the allocation site | +| `address` | `long` (address) | Returned pointer value | +| `size` | `long` (bytes) | Requested allocation size | +| `weight` | `float` | Statistical sample weight based on Poisson sampling probability | + +Events are written by `Recording::recordMallocSample()` in `flightRecorder.cpp`: + +```cpp +buf->putVar64(T_MALLOC); +buf->putVar64(event->_start_time); +buf->putVar32(tid); +buf->putVar64(call_trace_id); +buf->putVar64(event->_address); +buf->putVar64(event->_size); +buf->putFloat(event->_weight); +writeContext(buf, Contexts::get()); +``` + +--- + +## Concurrency and Thread Safety + +| Concern | Mechanism | +|---------|-----------| +| GOT patching across threads | `_patch_lock` (Mutex) in `patchLibraries()` | +| Library unload during patching | `UnloadProtection` handle per library | +| Allocation byte counter | Lock-free CAS loop in `shouldSample` | +| JFR buffer writes | Per-lock-index try-lock with 3 attempts; events dropped on contention | +| Hook enable / disable | `volatile bool _running` — checked before every recording call | +| `_initialized` write ordering | Serialized by the profiler's outer state lock (caller responsibility) | + +--- + +## Known Limitations and Design Trade-offs + +**No reentrancy guard.** As documented in `mallocTracer.cpp`: + +> To avoid complexity in hooking and tracking reentrancy, a TLS-based approach is +> not used. Reentrant allocation calls would result in double-accounting. + +When `recordMalloc` calls into the profiler (stack walking, JFR buffer writes), any +allocations made by the profiler itself will re-enter the hooks. Infinite recursion +is prevented because the hook functions call `_orig_malloc` (a saved direct function +pointer) instead of going through the GOT, but profiler-internal allocations may be +double-counted as application allocations. +Leak detection is unaffected: the same address being recorded multiple times is +handled correctly by the tracking logic. + +**Hooks are never uninstalled.** `stop()` only sets `_running = false`. The GOT +entries remain patched for the lifetime of the process. After stopping, every +malloc/free incurs the overhead of one function-pointer indirection plus a volatile +bool read, which is negligible in practice. Uninstalling hooks safely would require +iterating all libraries again under `_patch_lock`, which is deferred. + +**`nativemem=0` records every allocation.** When `_interval == 0`, +`shouldSample` returns `true` on every call (the `interval <= 1` fast path). This +is intentional for 100% sampling but can produce very high event volumes. + +**No free event tracking.** Free calls are hooked (to forward through the GOT +correctly) but not recorded. Sampled mallocs mean most frees would match nothing, +and the immense event volume with no stack traces provides no actionable insight. + +**HotSpot / Linux only for full stack traces.** `CSTACK_VM` requires +`VMStructs::hasStackStructs()`, which is only true on HotSpot JVMs on Linux. On other +platforms the profiler falls back to `CSTACK_DEFAULT` and malloc events will have +empty stack traces.