#pragma once #ifdef OS_LINUX #include #include #include #include #include #include namespace DB { inline Int64 futexWait(void * address, UInt32 value) { return syscall(SYS_futex, address, FUTEX_WAIT_PRIVATE, value, nullptr, nullptr, 0); } inline Int64 futexWake(void * address, int count) { return syscall(SYS_futex, address, FUTEX_WAKE_PRIVATE, count, nullptr, nullptr, 0); } inline void futexWaitFetch(std::atomic & address, UInt32 & value) { futexWait(&address, value); value = address.load(); } inline void futexWakeOne(std::atomic & address) { futexWake(&address, 1); } inline void futexWakeAll(std::atomic & address) { futexWake(&address, INT_MAX); } constexpr UInt32 lowerHalf(UInt64 value) { return static_cast(value & 0xffffffffull); } constexpr UInt32 upperHalf(UInt64 value) { return static_cast(value >> 32ull); } inline UInt32 * lowerHalfAddress(void * address) { return reinterpret_cast(address) + (std::endian::native == std::endian::big); } inline UInt32 * upperHalfAddress(void * address) { return reinterpret_cast(address) + (std::endian::native == std::endian::little); } inline void futexWaitLowerFetch(std::atomic & address, UInt64 & value) { futexWait(lowerHalfAddress(&address), lowerHalf(value)); value = address.load(); } inline void futexWakeLowerOne(std::atomic & address) { futexWake(lowerHalfAddress(&address), 1); } inline void futexWakeLowerAll(std::atomic & address) { futexWake(lowerHalfAddress(&address), INT_MAX); } inline void futexWaitUpperFetch(std::atomic & address, UInt64 & value) { futexWait(upperHalfAddress(&address), upperHalf(value)); value = address.load(); } inline void futexWakeUpperOne(std::atomic & address) { futexWake(upperHalfAddress(&address), 1); } inline void futexWakeUpperAll(std::atomic & address) { futexWake(upperHalfAddress(&address), INT_MAX); } } #endif