00001 /*************************************************************************** 00002 * RCS INFORMATION: 00003 * 00004 * $RCSfile: WKFThreads.C,v $ 00005 * $Author: johns $ $Locker: $ $State: Exp $ 00006 * $Revision: 1.36 $ $Date: 2022年04月08日 08:06:44 $ 00007 * 00008 ***************************************************************************/ 00034 /* Tachyon copyright reproduced below */ 00035 /* 00036 * Copyright (c) 1994-2016 John E. Stone 00037 * All rights reserved. 00038 * 00039 * Redistribution and use in source and binary forms, with or without 00040 * modification, are permitted provided that the following conditions 00041 * are met: 00042 * 1. Redistributions of source code must retain the above copyright 00043 * notice, this list of conditions and the following disclaimer. 00044 * 2. Redistributions in binary form must reproduce the above copyright 00045 * notice, this list of conditions and the following disclaimer in the 00046 * documentation and/or other materials provided with the distribution. 00047 * 3. The name of the author may not be used to endorse or promote products 00048 * derived from this software without specific prior written permission. 00049 * 00050 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS 00051 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00052 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00053 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY 00054 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 00055 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 00056 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 00057 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00058 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 00059 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 00060 * SUCH DAMAGE. 00061 */ 00062 00063 #include <stdio.h> 00064 #include <stdlib.h> 00065 #include <string.h> 00066 00071 #if defined(__linux) 00072 #define _GNU_SOURCE 1 00073 #include <sched.h> 00074 #endif 00075 00076 #include "WKFThreads.h" 00077 00078 /* needed for CPU info APIs and flag macros */ 00079 #if (defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 1300)) || (defined(_MSC_VER) && (_MSC_VER >= 1916)) 00080 #include <emmintrin.h> 00081 #include <immintrin.h> 00082 #endif 00083 00084 #ifdef _MSC_VER 00085 #if 0 00086 #define WKFUSENEWWIN32APIS 1 00087 #define _WIN32_WINNT 0x0400 00088 #define WINVER 0x0400 00089 #endif 00090 #include <windows.h> 00091 #include <winbase.h> 00092 #endif 00093 00094 #if defined(_AIX) || defined(_CRAY) || defined(__irix) || defined(__linux) || defined(__osf__) || defined(__sun) 00095 #include <unistd.h> 00096 #endif 00097 00098 #if defined(__APPLE__) && defined(WKFTHREADS) 00099 #if 1 00100 #include <sys/types.h> 00101 #include <sys/sysctl.h> 00102 #else 00103 #include <Carbon/Carbon.h> 00104 #endif 00105 #endif 00106 00107 #if defined(__linux) && (defined(ARCH_LINUXARM64) || defined(__ARM_ARCH_ISA_A64) || defined(__ARM_NEON)) 00108 #include <sys/auxv.h> 00109 #endif 00110 00111 #if defined(__hpux) 00112 #include <sys/mpctl.h> 00113 #endif 00114 00115 00116 #ifdef __cplusplus 00117 extern "C" { 00118 #endif 00119 00120 int wkf_thread_numphysprocessors(void) { 00121 int a=1; 00122 00123 #ifdef WKFTHREADS 00124 #if defined(__APPLE__) 00125 #if 1 00126 int rc; 00127 int mib[2]; 00128 u_int miblen; 00129 size_t alen = sizeof(a); 00130 mib[0] = CTL_HW; 00131 mib[1] = HW_AVAILCPU; 00132 miblen = 2; 00133 rc = sysctl(mib, miblen, &a, &alen, NULL, 0); 00134 if (rc < 0) { 00135 perror("Error during sysctl() query for CPU count"); 00136 a = 1; 00137 } 00138 #else 00139 a = MPProcessorsScheduled(); 00140 #endif 00141 #endif 00142 00143 #ifdef _MSC_VER 00144 struct _SYSTEM_INFO sysinfo; 00145 GetSystemInfo(&sysinfo); 00146 a = sysinfo.dwNumberOfProcessors; 00147 #endif /* _MSC_VER */ 00148 00149 #if defined(__PARAGON__) 00150 a=2; 00151 #endif /* __PARAGON__ */ 00152 00153 #if defined(_CRAY) 00154 a = sysconf(_SC_CRAY_NCPU); 00155 #endif 00156 00157 #if defined(ANDROID) || defined(USEPHYSCPUCOUNT) 00158 /* Android devices and the NVIDIA/SECO "CARMA" and "Kayla" */ 00159 /* boards toggles cores on/off according to system activity, */ 00160 /* thermal management, and battery state. For now, we will */ 00161 /* use as many threads as the number of physical cores since */ 00162 /* the number that are online may vary even over a 2 second */ 00163 /* time window. We will likely have this issue on many more */ 00164 /* platforms as power management becomes more important... */ 00165 00166 /* use sysconf() for initial guess, although it produces incorrect */ 00167 /* results on the older android releases due to a bug in the platform */ 00168 a = sysconf(_SC_NPROCESSORS_CONF); 00170 /* check CPU count by parsing /sys/devices/system/cpu/present and use */ 00171 /* whichever result gives the larger CPU count... */ 00172 { 00173 int rc=0, b=1, i=-1, j=-1; 00174 FILE *ifp; 00175 00176 ifp = fopen("/sys/devices/system/cpu/present", "r"); 00177 if (ifp != NULL) { 00178 rc = fscanf(ifp, "%d-%d", &i, &j); /* read and interpret line */ 00179 fclose(ifp); 00180 00181 if (rc == 2 && i == 0) { 00182 b = j+1; /* 2 or more cores exist */ 00183 } 00184 } 00185 00186 /* return the greater CPU count result... */ 00187 a = (a > b) ? a : b; 00188 } 00189 #else 00190 #if defined(__sun) || defined(__linux) || defined(__osf__) || defined(_AIX) 00191 a = sysconf(_SC_NPROCESSORS_ONLN); 00192 #endif /* SunOS, and similar... */ 00193 #endif /* Android */ 00194 00195 #if defined(__irix) 00196 a = sysconf(_SC_NPROC_ONLN); 00197 #endif /* IRIX */ 00198 00199 #if defined(__hpux) 00200 a = mpctl(MPC_GETNUMSPUS, 0, 0); 00201 #endif /* HPUX */ 00202 #endif /* WKFTHREADS */ 00203 00204 return a; 00205 } 00206 00207 00208 int wkf_thread_numprocessors(void) { 00209 int a=1; 00210 00211 #ifdef WKFTHREADS 00212 /* Allow the user to override the number of CPUs for use */ 00213 /* in scalability testing, debugging, etc. */ 00214 char *forcecount = getenv("WKFFORCECPUCOUNT"); 00215 00216 /* VMD specific env variable */ 00217 if (forcecount == NULL) 00218 forcecount = getenv("VMDFORCECPUCOUNT"); 00219 00220 if (forcecount != NULL) { 00221 if (sscanf(forcecount, "%d", &a) == 1) { 00222 return a; /* if we got a valid count, return it */ 00223 } else { 00224 a=1; /* otherwise use the real available hardware CPU count */ 00225 } 00226 } 00227 00228 /* otherwise return the number of physical processors currently available */ 00229 a = wkf_thread_numphysprocessors(); 00230 00231 /* XXX we should add checking for the current CPU affinity masks here, */ 00232 /* and return the min of the physical processor count and CPU affinity */ 00233 /* mask enabled CPU count. */ 00234 #endif /* WKFTHREADS */ 00235 00236 return a; 00237 } 00238 00239 00240 /* 00241 * Functions supporting processor-specific runtime dispatch for hand-written 00242 * kernels using SIMD vector intrinsics or other highly specialized routines. 00243 */ 00244 #define WKF_USEINTCPUID 1 00245 #if defined(WKF_USEINTCPUID) && (defined(__GNUC__) || defined(__INTEL_COMPILER) || (defined(_MSC_VER) && (_MSC_VER >= 1916))) && (defined(__i386__) || defined(__x86_64__) || defined(_M_IX86) || defined(_M_AMD64)) 00246 #if 1 00247 //static void wkf_cpuid(uint32_t eax, uint32_t ecx, uint32_t* abcd) { 00248 static void wkf_cpuid(unsigned int eax, unsigned int ecx, unsigned int* abcd) { 00249 #if defined(_MSC_VER) 00250 __cpuidex((int*)abcd, eax, ecx); 00251 #else 00252 // uint32_t ebx, edx; 00253 unsigned int ebx=0, edx=0; 00254 #if defined(__i386__) && defined (__PIC__) 00255 /* in case of PIC under 32-bit EBX cannot be clobbered */ 00256 __asm__("movl %%ebx, %%edi \n\t cpuid \n\t xchgl %%ebx, %%edi" : "=D" (ebx), 00257 #else 00258 __asm__("cpuid" : "+b" (ebx), 00259 #endif 00260 "+a" (eax), "+c" (ecx), "=d" (edx)); 00261 abcd[0] = eax; abcd[1] = ebx; abcd[2] = ecx; abcd[3] = edx; 00262 #endif 00263 } 00264 #else 00265 static void wkf_cpuid(unsigned int eax, unsigned int ecx, unsigned int *info) { 00266 __asm__ __volatile__( 00267 "xchg %%ebx, %%edi;" 00268 "cpuid;" 00269 "xchg %%ebx, %%edi;" 00270 :"=a" (info[0]), "=D" (info[1]), "=c" (info[2]), "=d" (info[3]) 00271 :"0" (eax) 00272 ); 00273 } 00274 #endif 00275 00276 static unsigned long long wkf_xgetbv(unsigned int index) { 00277 #if defined(_MSC_VER) 00278 return _xgetbv(index); 00279 #else 00280 unsigned int eax=0, edx=0; 00281 __asm__ __volatile__( 00282 "xgetbv;" 00283 : "=a" (eax), "=d"(edx) 00284 : "c" (index) 00285 ); 00286 return ((unsigned long long) edx << 32) | eax; 00287 #endif 00288 } 00289 #endif 00290 00291 00292 int wkf_cpu_capability_flags(wkf_cpu_caps_t *cpucaps) { 00293 int flags=CPU_UNKNOWN; 00294 int smtdepth = CPU_SMTDEPTH_UNKNOWN; 00295 00296 #if defined(WKF_USEINTCPUID) && (defined(__GNUC__) || defined(__INTEL_COMPILER) || (defined(_MSC_VER) && (_MSC_VER >= 1916))) && (defined(__i386__) || defined(__x86_64__) || defined(_M_IX86) || defined(_M_AMD64)) 00297 #define WKF_INTERNAL_ENABLE_CPUCAP_BAILOUT 1 00298 // https://software.intel.com/content/www/us/en/develop/articles/how-to-detect-new-instruction-support-in-the-4th-generation-intel-core-processor-family.html 00299 // https://stackoverflow.com/questions/6121792/how-to-check-if-a-cpu-supports-the-sse3-instruction-set 00300 // https://gist.github.com/hi2p-perim/7855506 00301 // http://www.hugi.scene.org/online/coding/hugi%2016%20-%20corawhd4.htm 00302 // http://www.geoffchappell.com/studies/windows/km/cpu/precpuid.htm 00303 // http://www.geoffchappell.com/studies/windows/km/cpu/cpuid/index.htm 00304 // https://www.sandpile.org/x86/cpuid.htm 00305 // https://lemire.me/blog/2020/07/17/the-cost-of-runtime-dispatch/ 00306 // https://github.com/google/cpu_features/ 00307 // https://github.com/klauspost/cpuid 00308 // https://github.com/anrieff/libcpuid/tree/master/libcpuid 00309 // Considerations about clock rate capping and false dependencies 00310 // when high AVX/AVX-512 registers are considered "in use" with 00311 // stale data, unless cleared, e.g., by _mm256_zeroupper(): 00312 // https://blog.cloudflare.com/on-the-dangers-of-intels-frequency-scaling/ 00313 // https://www.agner.org/optimize/blog/read.php?i=857 00314 unsigned int vendcpuinfo[4] = { 0 }; 00315 unsigned int cpuinfo[4] = { 0 }; 00316 unsigned long long xcrFeatureMask = 0; 00317 int havexmmymm = 0; 00318 int havezmmmask = 0; 00319 int haveosxsave = 0; 00320 00321 wkf_cpuid(0, 0, vendcpuinfo); /* get vendor string, highest function code */ 00322 if (vendcpuinfo[0] == 0) 00323 goto nocpuinfo; /* bail on very primitive CPU type, max fctn code==0 */ 00324 00325 wkf_cpuid(1, 0, cpuinfo); /* get various SIMD extension flags */ 00326 haveosxsave = (cpuinfo[2] & (1 << 27)) != 0; /* OS save/restore xmm regs */ 00327 00328 flags = 0; 00329 flags |= ((cpuinfo[2] & (1 << 19)) != 0) * CPU_SSE4_1; 00330 flags |= ((cpuinfo[2] & (1 << 29)) != 0) * CPU_F16C; 00331 flags |= ((cpuinfo[2] & (1 << 31)) != 0) * CPU_HYPERVISOR; 00332 flags |= ((cpuinfo[3] & (1 << 26)) != 0) * CPU_SSE2; 00333 flags |= ((cpuinfo[3] & (1 << 28)) != 0) * CPU_HT; 00334 00335 /* if we have AVX, we need to call xgetbv too */ 00336 if ((cpuinfo[2] & (1 << 28)) != 0) { 00337 xcrFeatureMask = wkf_xgetbv(0); 00338 havexmmymm = (xcrFeatureMask & 0x06) == 0x06; 00339 havezmmmask = (xcrFeatureMask & 0xE6) == 0xE6; 00340 } 00341 00342 flags |= (((cpuinfo[2] & (1 << 12)) != 0) && 00343 havexmmymm && haveosxsave) * CPU_FMA; 00344 00345 flags |= (((cpuinfo[2] & (1 << 28)) != 0) && 00346 havexmmymm && haveosxsave) * CPU_AVX; 00347 00348 /* check that we can call CPUID function 7 */ 00349 if (cpuinfo[0] >= 0x7) { 00350 unsigned int extcpuinfo[4] = { 0 }; 00351 wkf_cpuid(7, 0, extcpuinfo); 00352 00353 flags |= (((extcpuinfo[1] & (1 << 5)) != 0) && 00354 havexmmymm && haveosxsave) * CPU_AVX2; 00355 00356 flags |= (((extcpuinfo[1] & (1 << 16)) != 0) && 00357 havezmmmask && haveosxsave) * CPU_AVX512F; 00358 flags |= (((extcpuinfo[1] & (1 << 26)) != 0) && 00359 havezmmmask && haveosxsave) * CPU_AVX512PF; 00360 flags |= (((extcpuinfo[1] & (1 << 27)) != 0) && 00361 havezmmmask && haveosxsave) * CPU_AVX512ER; 00362 flags |= (((extcpuinfo[1] & (1 << 28)) != 0) && 00363 havezmmmask && haveosxsave) * CPU_AVX512CD; 00364 } 00365 00366 smtdepth = 1; 00367 if (flags & CPU_HT) { 00368 #if 1 00369 /* XXX correct this for Phi, OS/BIOS settings */ 00370 smtdepth = 2; 00371 00372 /* XXX Hack to detect Xeon Phi CPUs since no other CPUs */ 00373 /* support AVX-512ER or AVX-512PF (yet...) */ 00374 if ((flags & CPU_AVX512ER) && (flags & CPU_AVX512PF)) { 00375 smtdepth = 4; 00376 } 00377 #else 00378 int logicalcores = (cpuinfo[1] >> 16) && 0xFF; 00379 int physicalcores = logicalcores; 00380 char vendor[16] = { 0 }; 00381 ((unsigned *)vendor)[0] = vendcpuinfo[1]; 00382 ((unsigned *)vendor)[1] = vendcpuinfo[3]; 00383 ((unsigned *)vendor)[2] = vendcpuinfo[2]; 00384 00385 /* hmm, not quite right yet */ 00386 if (!strcmp(vendor, "GenuineIntel")) { 00387 unsigned int corecpuinfo[4] = { 0 }; 00388 wkf_cpuid(4, 0, corecpuinfo); 00389 physicalcores = ((corecpuinfo[0] >> 26) & 0x3f) + 1; 00390 } else if (!strcmp(vendor, "AuthenticAMD")) { 00391 unsigned int corecpuinfo[4] = { 0 }; 00392 wkf_cpuid(0x80000008, 0, corecpuinfo); 00393 physicalcores = (corecpuinfo[2] & 0xFF) + 1; 00394 } 00395 00396 printf("cpuinfo: %d / %d vend: %s\n", logicalcores, physicalcores, vendor); 00397 00398 smtdepth = logicalcores / physicalcores; 00399 #endif 00400 } 00401 00402 #elif defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 1300) 00403 00404 // https://software.intel.com/content/www/us/en/develop/documentation/cpp-compiler-developer-guide-and-reference/top/compiler-reference/intrinsics/intrinsics-for-all-intel-architectures/may-i-use-cpu-feature.html 00405 flags = 0; 00406 flags |= _may_i_use_cpu_feature(_FEATURE_SSE2) * CPU_SSE2; 00407 flags |= _may_i_use_cpu_feature(_FEATURE_SSE4_1) * CPU_SSE4_1; 00408 flags |= _may_i_use_cpu_feature(_FEATURE_AVX) * CPU_AVX; 00409 flags |= _may_i_use_cpu_feature(_FEATURE_AVX2) * CPU_AVX2; 00410 flags |= _may_i_use_cpu_feature(_FEATURE_FMA) * CPU_FMA; 00411 flags |= _may_i_use_cpu_feature(_FEATURE_AVX512F) * CPU_AVX512F; 00412 flags |= _may_i_use_cpu_feature(_FEATURE_AVX512CD) * CPU_AVX512CD; 00413 flags |= _may_i_use_cpu_feature(_FEATURE_AVX512ER) * CPU_AVX512ER; 00414 flags |= _may_i_use_cpu_feature(_FEATURE_AVX512PF) * CPU_AVX512PF; 00415 00416 #elif defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__)) 00417 00418 // https://gcc.gnu.org/onlinedocs/gcc/x86-Built-in-Functions.html 00419 flags = 0; 00420 __builtin_cpu_init(); 00421 flags |= (__builtin_cpu_supports("sse2")!=0) * CPU_SSE2; 00422 flags |= (__builtin_cpu_supports("sse4.1")!=0) * CPU_SSE4_1; 00423 flags |= (__builtin_cpu_supports("avx")!=0) * CPU_AVX; 00424 flags |= (__builtin_cpu_supports("avx2")!=0) * CPU_AVX2; 00425 flags |= (__builtin_cpu_supports("fma")!=0) * CPU_FMA; 00426 flags |= (__builtin_cpu_supports("avx512f")!=0) * CPU_AVX512F; 00427 flags |= (__builtin_cpu_supports("avx512cd")!=0) * CPU_AVX512CD; 00428 flags |= (__builtin_cpu_supports("avx512er")!=0) * CPU_AVX512ER; 00429 flags |= (__builtin_cpu_supports("avx512pf")!=0) * CPU_AVX512PF; 00430 00431 #elif defined(__linux) && (defined(ARCH_LINUXARM64) || defined(__ARM_ARCH_ISA_A64) || defined(__ARM_NEON)) 00432 00433 // https://golang.org/src/internal/cpu/cpu_arm64.go 00434 // https://code.woboq.org/qt5/qtbase/src/corelib/tools/qsimd.cpp.html 00435 // https://www.kernel.org/doc/html/latest/arm64/elf_hwcaps.html 00436 // https://man7.org/linux/man-pages/man3/getauxval.3.html 00437 // https://lists.cs.columbia.edu/pipermail/kvmarm/2017-August/026715.html 00438 unsigned long auxval1=0; 00439 // unsigned long auxval2=0; 00440 auxval1 = getauxval(AT_HWCAP); 00441 // auxval2 = getauxval(AT_HWCAP2); 00442 // printf("WKFThreadsARM: %016lx %016lx\n", auxval1, auxval2); 00443 00444 flags = 0; 00445 flags |= ((auxval1 & HWCAP_FP) != 0) * CPU_ARM64_FP; 00446 00447 flags |= ((auxval1 & HWCAP_ASIMD) != 0) * CPU_ARM64_ASIMD; 00448 flags |= ((auxval1 & HWCAP_ASIMDHP) != 0) * CPU_ARM64_ASIMDHP; 00449 flags |= ((auxval1 & HWCAP_ASIMDRDM) != 0) * CPU_ARM64_ASIMDRDM; 00450 flags |= ((auxval1 & HWCAP_ASIMDDP) != 0) * CPU_ARM64_ASIMDDP; 00451 flags |= ((auxval1 & HWCAP_ASIMDFHM) != 0) * CPU_ARM64_ASIMDFHM; 00452 00453 flags |= ((auxval1 & HWCAP_SVE) != 0) * CPU_ARM64_SVE; 00454 00455 flags |= ((auxval1 & HWCAP_AES) != 0) * CPU_ARM64_AES; 00456 flags |= ((auxval1 & HWCAP_CRC32) != 0) * CPU_ARM64_CRC32; 00457 flags |= ((auxval1 & HWCAP_SHA1) != 0) * CPU_ARM64_SHA1; 00458 flags |= ((auxval1 & HWCAP_SHA2) != 0) * CPU_ARM64_SHA2; 00459 flags |= ((auxval1 & HWCAP_SHA3) != 0) * CPU_ARM64_SHA3; 00460 flags |= ((auxval1 & HWCAP_SHA512) != 0) * CPU_ARM64_SHA512; 00461 #endif 00462 00463 #if defined (WKF_INTERNAL_ENABLE_CPUCAP_BAILOUT) 00464 nocpuinfo: 00465 #endif 00466 cpucaps->flags = flags; 00467 cpucaps->smtdepth = smtdepth; 00468 00469 if (flags == CPU_UNKNOWN) 00470 return 1; 00471 00472 return 0; 00473 } 00474 00475 00476 int wkf_cpu_smt_depth(void) { 00477 int smtdepth = CPU_SMTDEPTH_UNKNOWN; 00478 00479 #if defined(WKF_USEINTCPUID) && (defined(__GNUC__) || defined(__INTEL_COMPILER)) && (defined(__i386__) || defined(__x86_64__)) 00480 // x86 examples: 00481 // https://software.intel.com/en-us/articles/methods-to-utilize-intels-hyper-threading-technology-with-linux 00482 // https://stackoverflow.com/questions/2901694/how-to-detect-the-number-of-physical-processors-cores-on-windows-mac-and-linu 00483 wkf_cpu_caps_t cpucaps; 00484 if (!wkf_cpu_capability_flags(&cpucaps)) { 00485 smtdepth = cpucaps.smtdepth; 00486 } 00487 #endif 00488 00489 return smtdepth; 00490 } 00491 00492 00493 int * wkf_cpu_affinitylist(int *cpuaffinitycount) { 00494 int *affinitylist = NULL; 00495 *cpuaffinitycount = -1; /* return count -1 if unimplemented or err occurs */ 00496 00497 /* Win32 process affinity mask query */ 00498 #if 0 && defined(_MSC_VER) 00499 /* XXX untested, but based on the linux code, may work with a few tweaks */ 00500 HANDLE myproc = GetCurrentProcess(); /* returns a psuedo-handle */ 00501 DWORD affinitymask, sysaffinitymask; 00502 00503 if (!GetProcessAffinityMask(myproc, &affinitymask, &sysaffinitymask)) { 00504 /* count length of affinity list */ 00505 int affinitycount=0; 00506 int i; 00507 for (i=0; i<31; i++) { 00508 affinitycount += (affinitymask >> i) & 0x1; 00509 } 00510 00511 /* build affinity list */ 00512 if (affinitycount > 0) { 00513 affinitylist = (int *) malloc(affinitycount * sizeof(int)); 00514 if (affinitylist == NULL) 00515 return NULL; 00516 00517 int curcount = 0; 00518 for (i=0; i<CPU_SETSIZE; i++) { 00519 if (CPU_ISSET(i, &affinitymask)) { 00520 affinitylist[curcount] = i; 00521 curcount++; 00522 } 00523 } 00524 } 00525 00526 *cpuaffinitycount = affinitycount; /* return final affinity list */ 00527 } 00528 #endif 00529 00530 /* Linux process affinity mask query */ 00531 #if defined(__linux) 00532 00533 /* protect ourselves from some older Linux distros */ 00534 #if defined(CPU_SETSIZE) 00535 int i; 00536 cpu_set_t affinitymask; 00537 int affinitycount=0; 00538 00539 /* PID 0 refers to the current process */ 00540 if (sched_getaffinity(0, sizeof(affinitymask), &affinitymask) < 0) { 00541 perror("wkf_cpu_affinitylist: sched_getaffinity"); 00542 return NULL; 00543 } 00544 00545 /* count length of affinity list */ 00546 for (i=0; i<CPU_SETSIZE; i++) { 00547 affinitycount += CPU_ISSET(i, &affinitymask); 00548 } 00549 00550 /* build affinity list */ 00551 if (affinitycount > 0) { 00552 affinitylist = (int *) malloc(affinitycount * sizeof(int)); 00553 if (affinitylist == NULL) 00554 return NULL; 00555 00556 int curcount = 0; 00557 for (i=0; i<CPU_SETSIZE; i++) { 00558 if (CPU_ISSET(i, &affinitymask)) { 00559 affinitylist[curcount] = i; 00560 curcount++; 00561 } 00562 } 00563 } 00564 00565 *cpuaffinitycount = affinitycount; /* return final affinity list */ 00566 #endif 00567 #endif 00568 00569 /* MacOS X 10.5.x has a CPU affinity query/set capability finally */ 00570 /* http://developer.apple.com/releasenotes/Performance/RN-AffinityAPI/ */ 00571 00572 /* Solaris and HP-UX use pset_bind() and related functions, and they */ 00573 /* don't use the single-level mask-based scheduling mechanism that */ 00574 /* the others, use. Instead, they use a hierarchical tree of */ 00575 /* processor sets and processes float within those, or are tied to one */ 00576 /* processor that's a member of a particular set. */ 00577 00578 return affinitylist; 00579 } 00580 00581 00582 int wkf_thread_set_self_cpuaffinity(int cpu) { 00583 int status=-1; /* unsupported by default */ 00584 00585 #ifdef WKFTHREADS 00586 00587 #if defined(__linux) && defined(CPU_ZERO) && defined(CPU_SET) 00588 #if defined(__MIC__) 00589 /* XXX this is available on Intel MIC */ 00590 /* XXX this code is too new even for RHEL4, though it runs on Fedora 7 */ 00591 /* and other newer revs. */ 00592 /* NPTL systems can assign per-thread affinities this way */ 00593 cpu_set_t affinitymask; 00594 CPU_ZERO(&affinitymask); 00595 CPU_SET(cpu, &affinitymask); 00596 status = pthread_setaffinity_np(pthread_self(), sizeof(affinitymask), &affinitymask); 00597 #else 00598 /* non-NPTL systems based on the clone() API must use this method */ 00599 cpu_set_t affinitymask; 00600 CPU_ZERO(&affinitymask); 00601 CPU_SET(cpu, &affinitymask); 00602 00603 /* PID 0 refers to the current process */ 00604 if ((status=sched_setaffinity(0, sizeof(affinitymask), &affinitymask)) < 0) { 00605 perror("wkf_thread_set_self_cpuaffinitylist: sched_setaffinity"); 00606 return status; 00607 } 00608 #endif 00609 00610 /* call sched_yield() so new affinity mask takes effect immediately */ 00611 sched_yield(); 00612 #endif /* linux */ 00613 00614 /* MacOS X 10.5.x has a CPU affinity query/set capability finally */ 00615 /* http://developer.apple.com/releasenotes/Performance/RN-AffinityAPI/ */ 00616 00617 /* Solaris and HP-UX use pset_bind() and related functions, and they */ 00618 /* don't use the single-level mask-based scheduling mechanism that */ 00619 /* the others, use. Instead, they use a hierarchical tree of */ 00620 /* processor sets and processes float within those, or are tied to one */ 00621 /* processor that's a member of a particular set. */ 00622 #endif 00623 00624 return status; 00625 } 00626 00627 00628 int wkf_thread_setconcurrency(int nthr) { 00629 int status=0; 00630 00631 #ifdef WKFTHREADS 00632 #if defined(__sun) 00633 #ifdef USEPOSIXTHREADS 00634 status = pthread_setconcurrency(nthr); 00635 #else 00636 status = thr_setconcurrency(nthr); 00637 #endif 00638 #endif /* SunOS */ 00639 00640 #if defined(__irix) || defined(_AIX) 00641 status = pthread_setconcurrency(nthr); 00642 #endif 00643 #endif /* WKFTHREADS */ 00644 00645 return status; 00646 } 00647 00648 00649 /* 00650 * Thread creation/management 00651 */ 00653 typedef void * (*WKFTHREAD_START_ROUTINE)(void *); 00654 00655 int wkf_thread_create(wkf_thread_t * thr, void * fctn(void *), void * arg) { 00656 int status=0; 00657 00658 #ifdef WKFTHREADS 00659 #ifdef _MSC_VER 00660 DWORD tid; /* thread id, msvc only */ 00661 *thr = CreateThread(NULL, 8192, (LPTHREAD_START_ROUTINE) fctn, arg, 0, &tid); 00662 if (*thr == NULL) { 00663 status = -1; 00664 } 00665 // If we want to spawn the thread "detached" without ever joining it in the 00666 // future, such that it's totally on its own, we need to call CloseHandle() 00667 // immediately on creation so the handle doesn't leak. If we need to join 00668 // later, we call CloseHandle() at the end of the join sync-up. 00669 // CloseHandle(thr); 00670 #endif /* _MSC_VER */ 00671 00672 #ifdef USEPOSIXTHREADS 00673 #if defined(_AIX) 00674 /* AIX schedule threads in system scope by default, have to ask explicitly */ 00675 { 00676 pthread_attr_t attr; 00677 pthread_attr_init(&attr); 00678 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); 00679 status = pthread_create(thr, &attr, (WKFTHREAD_START_ROUTINE)fctn, arg); 00680 pthread_attr_destroy(&attr); 00681 } 00682 #elif defined(__PARAGON__) 00683 status = pthread_create(thr, pthread_attr_default, fctn, arg); 00684 #else 00685 status = pthread_create(thr, NULL, (WKFTHREAD_START_ROUTINE)fctn, arg); 00686 #endif 00687 #endif /* USEPOSIXTHREADS */ 00688 00689 #ifdef USEUITHREADS 00690 status = thr_create(NULL, 0, (WKFTHREAD_START_ROUTINE)fctn, arg, 0, thr); 00691 #endif /* USEUITHREADS */ 00692 #endif /* WKFTHREADS */ 00693 00694 return status; 00695 } 00696 00697 00698 int wkf_thread_join(wkf_thread_t thr, void ** stat) { 00699 int status=0; 00700 00701 #ifdef WKFTHREADS 00702 #ifdef _MSC_VER 00703 DWORD wstatus = 0; 00704 00705 wstatus = WAIT_TIMEOUT; 00706 00707 while (wstatus != WAIT_OBJECT_0) { 00708 wstatus = WaitForSingleObject(thr, INFINITE); 00709 } 00710 // Windows won't free the thread handle until both the thread terminates 00711 // AND all existing handles to it are explicitly closed 00712 CloseHandle(thr); 00713 #endif /* _MSC_VER */ 00714 00715 #ifdef USEPOSIXTHREADS 00716 status = pthread_join(thr, stat); 00717 #endif /* USEPOSIXTHREADS */ 00718 00719 #ifdef USEUITHREADS 00720 status = thr_join(thr, NULL, stat); 00721 #endif /* USEPOSIXTHREADS */ 00722 #endif /* WKFTHREADS */ 00723 00724 return status; 00725 } 00726 00727 00728 /* 00729 * Mutexes 00730 */ 00731 int wkf_mutex_init(wkf_mutex_t * mp) { 00732 int status=0; 00733 00734 #ifdef WKFTHREADS 00735 #ifdef _MSC_VER 00736 InitializeCriticalSection(mp); 00737 #endif /* _MSC_VER */ 00738 00739 #ifdef USEPOSIXTHREADS 00740 status = pthread_mutex_init(mp, 0); 00741 #endif /* USEPOSIXTHREADS */ 00742 00743 #ifdef USEUITHREADS 00744 status = mutex_init(mp, USYNC_THREAD, NULL); 00745 #endif /* USEUITHREADS */ 00746 #endif /* WKFTHREADS */ 00747 00748 return status; 00749 } 00750 00751 00752 int wkf_mutex_lock(wkf_mutex_t * mp) { 00753 int status=0; 00754 00755 #ifdef WKFTHREADS 00756 #ifdef _MSC_VER 00757 EnterCriticalSection(mp); 00758 #endif /* _MSC_VER */ 00759 00760 #ifdef USEPOSIXTHREADS 00761 status = pthread_mutex_lock(mp); 00762 #endif /* USEPOSIXTHREADS */ 00763 00764 #ifdef USEUITHREADS 00765 status = mutex_lock(mp); 00766 #endif /* USEUITHREADS */ 00767 #endif /* WKFTHREADS */ 00768 00769 return status; 00770 } 00771 00772 00773 int wkf_mutex_trylock(wkf_mutex_t * mp) { 00774 int status=0; 00775 00776 #ifdef WKFTHREADS 00777 #ifdef _MSC_VER 00778 #if defined(WKFUSENEWWIN32APIS) 00779 /* TryEnterCriticalSection() is only available on newer */ 00780 /* versions of Win32: _WIN32_WINNT/WINVER >= 0x0400 */ 00781 status = (!(TryEnterCriticalSection(mp))); 00782 #endif 00783 #endif /* _MSC_VER */ 00784 00785 #ifdef USEPOSIXTHREADS 00786 status = (pthread_mutex_lock(mp) != 0); 00787 #endif /* USEPOSIXTHREADS */ 00788 #endif /* WKFTHREADS */ 00789 00790 return status; 00791 } 00792 00793 00794 int wkf_mutex_spin_lock(wkf_mutex_t * mp) { 00795 int status=0; 00796 00797 #ifdef WKFTHREADS 00798 #ifdef _MSC_VER 00799 #if defined(WKFUSENEWWIN32APIS) 00800 /* TryEnterCriticalSection() is only available on newer */ 00801 /* versions of Win32: _WIN32_WINNT/WINVER >= 0x0400 */ 00802 while (!TryEnterCriticalSection(mp)); 00803 #else 00804 EnterCriticalSection(mp); 00805 #endif 00806 #endif /* _MSC_VER */ 00807 00808 #ifdef USEPOSIXTHREADS 00809 while ((status = pthread_mutex_trylock(mp)) != 0); 00810 #endif /* USEPOSIXTHREADS */ 00811 #endif /* WKFTHREADS */ 00812 00813 return status; 00814 } 00815 00816 00817 int wkf_mutex_unlock(wkf_mutex_t * mp) { 00818 int status=0; 00819 00820 #ifdef WKFTHREADS 00821 #ifdef _MSC_VER 00822 LeaveCriticalSection(mp); 00823 #endif /* _MSC_VER */ 00824 00825 #ifdef USEPOSIXTHREADS 00826 status = pthread_mutex_unlock(mp); 00827 #endif /* USEPOSIXTHREADS */ 00828 00829 #ifdef USEUITHREADS 00830 status = mutex_unlock(mp); 00831 #endif /* USEUITHREADS */ 00832 #endif /* WKFTHREADS */ 00833 00834 return status; 00835 } 00836 00837 00838 int wkf_mutex_destroy(wkf_mutex_t * mp) { 00839 int status=0; 00840 00841 #ifdef WKFTHREADS 00842 #ifdef _MSC_VER 00843 DeleteCriticalSection(mp); 00844 #endif /* _MSC_VER */ 00845 00846 #ifdef USEPOSIXTHREADS 00847 status = pthread_mutex_destroy(mp); 00848 #endif /* USEPOSIXTHREADS */ 00849 00850 #ifdef USEUITHREADS 00851 status = mutex_destroy(mp); 00852 #endif /* USEUITHREADS */ 00853 #endif /* WKFTHREADS */ 00854 00855 return status; 00856 } 00857 00858 00859 /* 00860 * Condition variables 00861 */ 00862 int wkf_cond_init(wkf_cond_t * cvp) { 00863 int status=0; 00864 00865 #ifdef WKFTHREADS 00866 #ifdef _MSC_VER 00867 #if defined(WKFUSEWIN2008CONDVARS) 00868 InitializeConditionVariable(cvp); 00869 #else 00870 /* XXX not implemented */ 00871 cvp->waiters = 0; 00872 00873 /* Create an auto-reset event. */ 00874 cvp->events[WKF_COND_SIGNAL] = CreateEvent(NULL, /* no security */ 00875 FALSE, /* auto-reset event */ 00876 FALSE, /* non-signaled initially */ 00877 NULL); /* unnamed */ 00878 00879 /* Create a manual-reset event. */ 00880 cvp->events[WKF_COND_BROADCAST] = CreateEvent(NULL, /* no security */ 00881 TRUE, /* manual-reset */ 00882 FALSE, /* non-signaled initially */ 00883 NULL); /* unnamed */ 00884 #endif 00885 #endif /* _MSC_VER */ 00886 00887 #ifdef USEPOSIXTHREADS 00888 status = pthread_cond_init(cvp, NULL); 00889 #endif /* USEPOSIXTHREADS */ 00890 #ifdef USEUITHREADS 00891 status = cond_init(cvp, USYNC_THREAD, NULL); 00892 #endif 00893 #endif /* WKFTHREADS */ 00894 00895 return status; 00896 } 00897 00898 int wkf_cond_destroy(wkf_cond_t * cvp) { 00899 int status=0; 00900 00901 #ifdef WKFTHREADS 00902 #ifdef _MSC_VER 00903 #if defined(WKFUSEWIN2008CONDVARS) 00904 /* XXX not implemented */ 00905 #else 00906 CloseHandle(cvp->events[WKF_COND_SIGNAL]); 00907 CloseHandle(cvp->events[WKF_COND_BROADCAST]); 00908 #endif 00909 #endif /* _MSC_VER */ 00910 00911 #ifdef USEPOSIXTHREADS 00912 status = pthread_cond_destroy(cvp); 00913 #endif /* USEPOSIXTHREADS */ 00914 #ifdef USEUITHREADS 00915 status = cond_destroy(cvp); 00916 #endif 00917 #endif /* WKFTHREADS */ 00918 00919 return status; 00920 } 00921 00922 int wkf_cond_wait(wkf_cond_t * cvp, wkf_mutex_t * mp) { 00923 int status=0; 00924 #if defined(WKFTHREADS) && defined(_MSC_VER) 00925 int result=0; 00926 LONG last_waiter; 00927 LONG my_waiter; 00928 #endif 00929 00930 #ifdef WKFTHREADS 00931 #ifdef _MSC_VER 00932 #if defined(WKFUSEWIN2008CONDVARS) 00933 SleepConditionVariableCS(cvp, mp, INFINITE) 00934 #else 00935 #if !defined(WKFUSEINTERLOCKEDATOMICOPS) 00936 EnterCriticalSection(&cvp->waiters_lock); 00937 cvp->waiters++; 00938 LeaveCriticalSection(&cvp->waiters_lock); 00939 #else 00940 InterlockedIncrement(&cvp->waiters); 00941 #endif 00942 00943 LeaveCriticalSection(mp); /* SetEvent() keeps state, avoids lost wakeup */ 00944 00945 /* Wait either a single or broadcast even to become signalled */ 00946 result = WaitForMultipleObjects(2, cvp->events, FALSE, INFINITE); 00947 00948 #if !defined(WKFUSEINTERLOCKEDATOMICOPS) 00949 EnterCriticalSection (&cvp->waiters_lock); 00950 cvp->waiters--; 00951 last_waiter = 00952 ((result == (WAIT_OBJECT_0 + WKF_COND_BROADCAST)) && cvp->waiters == 0); 00953 LeaveCriticalSection (&cvp->waiters_lock); 00954 #else 00955 my_waiter = InterlockedDecrement(&cvp->waiters); 00956 last_waiter = 00957 ((result == (WAIT_OBJECT_0 + WKF_COND_BROADCAST)) && my_waiter == 0); 00958 #endif 00959 00960 /* Some thread called cond_broadcast() */ 00961 if (last_waiter) 00962 /* We're the last waiter to be notified or to stop waiting, so */ 00963 /* reset the manual event. */ 00964 ResetEvent(cvp->events[WKF_COND_BROADCAST]); 00965 00966 EnterCriticalSection(mp); 00967 #endif 00968 #endif /* _MSC_VER */ 00969 00970 #ifdef USEPOSIXTHREADS 00971 status = pthread_cond_wait(cvp, mp); 00972 #endif /* USEPOSIXTHREADS */ 00973 #ifdef USEUITHREADS 00974 status = cond_wait(cvp, mp); 00975 #endif 00976 #endif /* WKFTHREADS */ 00977 00978 return status; 00979 } 00980 00981 int wkf_cond_signal(wkf_cond_t * cvp) { 00982 int status=0; 00983 00984 #ifdef WKFTHREADS 00985 #ifdef _MSC_VER 00986 #if defined(WKFUSEWIN2008CONDVARS) 00987 WakeConditionVariable(cvp); 00988 #else 00989 #if !defined(WKFUSEINTERLOCKEDATOMICOPS) 00990 EnterCriticalSection(&cvp->waiters_lock); 00991 int have_waiters = (cvp->waiters > 0); 00992 LeaveCriticalSection(&cvp->waiters_lock); 00993 if (have_waiters) 00994 SetEvent (cvp->events[WKF_COND_SIGNAL]); 00995 #else 00996 if (InterlockedExchangeAdd(&cvp->waiters, 0) > 0) 00997 SetEvent(cvp->events[WKF_COND_SIGNAL]); 00998 #endif 00999 #endif 01000 #endif /* _MSC_VER */ 01001 01002 #ifdef USEPOSIXTHREADS 01003 status = pthread_cond_signal(cvp); 01004 #endif /* USEPOSIXTHREADS */ 01005 #ifdef USEUITHREADS 01006 status = cond_signal(cvp); 01007 #endif 01008 #endif /* WKFTHREADS */ 01009 01010 return status; 01011 } 01012 01013 int wkf_cond_broadcast(wkf_cond_t * cvp) { 01014 int status=0; 01015 01016 #ifdef WKFTHREADS 01017 #ifdef _MSC_VER 01018 #if defined(WKFUSEWIN2008CONDVARS) 01019 WakeAllConditionVariable(cvp); 01020 #else 01021 #if !defined(WKFUSEINTERLOCKEDATOMICOPS) 01022 EnterCriticalSection(&cvp->waiters_lock); 01023 int have_waiters = (cvp->waiters > 0); 01024 LeaveCriticalSection(&cvp->waiters_lock); 01025 if (have_waiters) 01026 SetEvent(cvp->events[WKF_COND_BROADCAST]); 01027 #else 01028 if (InterlockedExchangeAdd(&cvp->waiters, 0) > 0) 01029 SetEvent(cvp->events[WKF_COND_BROADCAST]); 01030 #endif 01031 01032 #endif 01033 #endif /* _MSC_VER */ 01034 01035 #ifdef USEPOSIXTHREADS 01036 status = pthread_cond_broadcast(cvp); 01037 #endif /* USEPOSIXTHREADS */ 01038 #ifdef USEUITHREADS 01039 status = cond_broadcast(cvp); 01040 #endif 01041 #endif /* WKFTHREADS */ 01042 01043 return status; 01044 } 01045 01046 01047 /* 01048 * Atomic integer ops -- Ideally implemented by fast machine instruction 01049 * fetch-and-add operations. Worst-case implementation 01050 * based on mutex locks and math ops if no other choice. 01051 */ 01052 01053 int wkf_atomic_int_init(wkf_atomic_int_t * atomp, int val) { 01054 memset(atomp, 0, sizeof(wkf_atomic_int_t)); 01055 #ifdef WKFTHREADS 01056 #if defined(USEGCCATOMICS) 01057 /* nothing to do here */ 01058 #elif defined(USENETBSDATOMICS) 01059 /* nothing to do here */ 01060 #elif defined(USESOLARISATOMICS) 01061 /* nothing to do here */ 01062 #elif defined(USEWIN32ATOMICS) 01063 /* nothing to do here */ 01064 #else /* use mutexes */ 01065 wkf_mutex_init(&atomp->lock); 01066 #endif 01067 #else 01068 /* nothing to do for non-threaded builds */ 01069 #endif 01070 atomp->val = val; 01071 01072 return 0; 01073 } 01074 01075 01076 int wkf_atomic_int_destroy(wkf_atomic_int_t * atomp) { 01077 #ifdef WKFTHREADS 01078 #if defined(USEGCCATOMICS) 01079 /* nothing to do here */ 01080 #elif defined(USENETBSDATOMICS) 01081 /* nothing to do here */ 01082 #elif defined(USESOLARISATOMICS) 01083 /* nothing to do here */ 01084 #elif defined(USEWIN32ATOMICS) 01085 /* nothing to do here */ 01086 #else /* use mutexes */ 01087 wkf_mutex_destroy(&atomp->lock); 01088 #endif 01089 #else 01090 /* nothing to do for non-threaded builds */ 01091 #endif 01092 01093 return 0; 01094 } 01095 01096 01097 int wkf_atomic_int_set(wkf_atomic_int_t * atomp, int val) { 01098 int retval; 01099 01100 #ifdef WKFTHREADS 01101 #if defined(USEGCCATOMICS) 01102 /* nothing special to do here? */ 01103 atomp->val = val; 01104 retval = val; 01105 #elif defined(USENETBSDATOMICS) 01106 /* nothing special to do here? */ 01107 atomp->val = val; 01108 retval = val; 01109 #elif defined(USESOLARISATOMICS) 01110 /* nothing special to do here? */ 01111 atomp->val = val; 01112 retval = val; 01113 #elif defined(USEWIN32ATOMICS) 01114 /* nothing special to do here? */ 01115 atomp->val = val; 01116 retval = val; 01117 #else /* use mutexes */ 01118 wkf_mutex_lock(&atomp->lock); 01119 atomp->val = val; 01120 retval = atomp->val; 01121 wkf_mutex_unlock(&atomp->lock); 01122 #endif 01123 #else 01124 /* nothing special to do here */ 01125 atomp->val = val; 01126 retval = atomp->val; 01127 #endif 01128 01129 return retval; 01130 } 01131 01132 01133 int wkf_atomic_int_get(wkf_atomic_int_t * atomp) { 01134 int retval; 01135 01136 #ifdef WKFTHREADS 01137 #if defined(USEGCCATOMICS) 01138 /* nothing special to do here? */ 01139 retval = atomp->val; 01140 #elif defined(USENETBSDATOMICS) 01141 /* nothing special to do here? */ 01142 retval = atomp->val; 01143 #elif defined(USESOLARISATOMICS) 01144 /* nothing special to do here? */ 01145 retval = atomp->val; 01146 #elif defined(USEWIN32ATOMICS) 01147 /* nothing special to do here? */ 01148 retval = atomp->val; 01149 #else /* use mutexes */ 01150 wkf_mutex_lock(&atomp->lock); 01151 retval = atomp->val; 01152 wkf_mutex_unlock(&atomp->lock); 01153 #endif 01154 #else 01155 /* nothing special to do here */ 01156 retval = atomp->val; 01157 #endif 01158 01159 return retval; 01160 } 01161 01162 int wkf_atomic_int_fetch_and_add(wkf_atomic_int_t * atomp, int inc) { 01163 #ifdef WKFTHREADS 01164 #if defined(USEGCCATOMICS) 01165 return __sync_fetch_and_add(&atomp->val, inc); 01166 #elif defined(USENETBSDATOMICS) 01167 /* value returned is the new value, so we have to subtract it off again */ 01168 return atomic_add_int_nv(&atomp->val, inc) - inc; 01169 #elif defined(USESOLARISATOMICS) 01170 /* value returned is the new value, so we have to subtract it off again */ 01171 return atomic_add_int_nv(&atomp->val, inc) - inc; 01172 #elif defined(USEWIN32ATOMICS) 01173 return InterlockedExchangeAdd(&atomp->val, inc); 01174 #else /* use mutexes */ 01175 int retval; 01176 wkf_mutex_lock(&atomp->lock); 01177 retval = atomp->val; 01178 atomp->val+=inc; 01179 wkf_mutex_unlock(&atomp->lock); 01180 return retval; 01181 #endif 01182 #else 01183 int retval = atomp->val; 01184 atomp->val+=inc; 01185 return retval; 01186 #endif 01187 } 01188 01189 01190 int wkf_atomic_int_add_and_fetch(wkf_atomic_int_t * atomp, int inc) { 01191 #ifdef WKFTHREADS 01192 #if defined(USEGCCATOMICS) 01193 return __sync_add_and_fetch(&atomp->val, inc); 01194 #elif defined(USENETBSDATOMICS) 01195 return atomic_add_int_nv(&atomp->val, inc); 01196 #elif defined(USESOLARISATOMICS) 01197 return atomic_add_int_nv(&atomp->val, inc); 01198 #elif defined(USEWIN32ATOMICS) 01199 /* value returned is the old value, so we have to add it on again */ 01200 return InterlockedExchangeAdd(&atomp->val, inc) + inc; 01201 #else /* use mutexes */ 01202 int retval; 01203 wkf_mutex_lock(&atomp->lock); 01204 atomp->val+=inc; 01205 retval = atomp->val; 01206 wkf_mutex_unlock(&atomp->lock); 01207 return retval; 01208 #endif 01209 #else 01210 int retval; 01211 atomp->val+=inc; 01212 retval = atomp->val; 01213 return retval; 01214 #endif 01215 } 01216 01217 01218 01219 /* 01220 * Reader/Writer locks -- slower than mutexes but good for some purposes 01221 */ 01222 int wkf_rwlock_init(wkf_rwlock_t * rwp) { 01223 int status=0; 01224 01225 #ifdef WKFTHREADS 01226 #ifdef _MSC_VER 01227 wkf_mutex_init(&rwp->lock); 01228 wkf_cond_init(&rwp->rdrs_ok); 01229 wkf_cond_init(&rwp->wrtr_ok); 01230 rwp->rwlock = 0; 01231 rwp->waiting_writers = 0; 01232 #endif 01233 01234 #ifdef USEPOSIXTHREADS 01235 pthread_mutex_init(&rwp->lock, NULL); 01236 pthread_cond_init(&rwp->rdrs_ok, NULL); 01237 pthread_cond_init(&rwp->wrtr_ok, NULL); 01238 rwp->rwlock = 0; 01239 rwp->waiting_writers = 0; 01240 #endif /* USEPOSIXTHREADS */ 01241 01242 #ifdef USEUITHREADS 01243 status = rwlock_init(rwp, USYNC_THREAD, NULL); 01244 #endif /* USEUITHREADS */ 01245 #endif /* WKFTHREADS */ 01246 01247 return status; 01248 } 01249 01250 01251 int wkf_rwlock_readlock(wkf_rwlock_t * rwp) { 01252 int status=0; 01253 01254 #ifdef WKFTHREADS 01255 #ifdef _MSC_VER 01256 wkf_mutex_lock(&rwp->lock); 01257 while (rwp->rwlock < 0 || rwp->waiting_writers) 01258 wkf_cond_wait(&rwp->rdrs_ok, &rwp->lock); 01259 rwp->rwlock++; /* increment number of readers holding the lock */ 01260 wkf_mutex_unlock(&rwp->lock); 01261 #endif 01262 01263 #ifdef USEPOSIXTHREADS 01264 pthread_mutex_lock(&rwp->lock); 01265 while (rwp->rwlock < 0 || rwp->waiting_writers) 01266 pthread_cond_wait(&rwp->rdrs_ok, &rwp->lock); 01267 rwp->rwlock++; /* increment number of readers holding the lock */ 01268 pthread_mutex_unlock(&rwp->lock); 01269 #endif /* USEPOSIXTHREADS */ 01270 01271 #ifdef USEUITHREADS 01272 status = rw_rdlock(rwp); 01273 #endif /* USEUITHREADS */ 01274 #endif /* WKFTHREADS */ 01275 01276 return status; 01277 } 01278 01279 01280 int wkf_rwlock_writelock(wkf_rwlock_t * rwp) { 01281 int status=0; 01282 01283 #ifdef WKFTHREADS 01284 #ifdef _MSC_VER 01285 wkf_mutex_lock(&rwp->lock); 01286 while (rwp->rwlock != 0) { 01287 rwp->waiting_writers++; 01288 wkf_cond_wait(&rwp->wrtr_ok, &rwp->lock); 01289 rwp->waiting_writers--; 01290 } 01291 rwp->rwlock=-1; 01292 wkf_mutex_unlock(&rwp->lock); 01293 #endif 01294 01295 #ifdef USEPOSIXTHREADS 01296 pthread_mutex_lock(&rwp->lock); 01297 while (rwp->rwlock != 0) { 01298 rwp->waiting_writers++; 01299 pthread_cond_wait(&rwp->wrtr_ok, &rwp->lock); 01300 rwp->waiting_writers--; 01301 } 01302 rwp->rwlock=-1; 01303 pthread_mutex_unlock(&rwp->lock); 01304 #endif /* USEPOSIXTHREADS */ 01305 01306 #ifdef USEUITHREADS 01307 status = rw_wrlock(rwp); 01308 #endif /* USEUITHREADS */ 01309 #endif /* WKFTHREADS */ 01310 01311 return status; 01312 } 01313 01314 01315 int wkf_rwlock_unlock(wkf_rwlock_t * rwp) { 01316 int status=0; 01317 01318 #ifdef WKFTHREADS 01319 #ifdef _MSC_VER 01320 int ww, wr; 01321 wkf_mutex_lock(&rwp->lock); 01322 if (rwp->rwlock > 0) { 01323 rwp->rwlock--; 01324 } else { 01325 rwp->rwlock = 0; 01326 } 01327 ww = (rwp->waiting_writers && rwp->rwlock == 0); 01328 wr = (rwp->waiting_writers == 0); 01329 wkf_mutex_unlock(&rwp->lock); 01330 if (ww) 01331 wkf_cond_signal(&rwp->wrtr_ok); 01332 else if (wr) 01333 wkf_cond_signal(&rwp->rdrs_ok); 01334 #endif 01335 01336 #ifdef USEPOSIXTHREADS 01337 int ww, wr; 01338 pthread_mutex_lock(&rwp->lock); 01339 if (rwp->rwlock > 0) { 01340 rwp->rwlock--; 01341 } else { 01342 rwp->rwlock = 0; 01343 } 01344 ww = (rwp->waiting_writers && rwp->rwlock == 0); 01345 wr = (rwp->waiting_writers == 0); 01346 pthread_mutex_unlock(&rwp->lock); 01347 if (ww) 01348 pthread_cond_signal(&rwp->wrtr_ok); 01349 else if (wr) 01350 pthread_cond_signal(&rwp->rdrs_ok); 01351 #endif /* USEPOSIXTHREADS */ 01352 01353 #ifdef USEUITHREADS 01354 status = rw_unlock(rwp); 01355 #endif /* USEUITHREADS */ 01356 #endif /* WKFTHREADS */ 01357 01358 return status; 01359 } 01360 01361 01362 /* 01363 * Simple counting barrier primitive 01364 */ 01365 wkf_barrier_t * wkf_thread_barrier_init(int n_clients) { 01366 wkf_barrier_t *barrier = (wkf_barrier_t *) malloc(sizeof(wkf_barrier_t)); 01367 01368 #ifdef WKFTHREADS 01369 if (barrier != NULL) { 01370 barrier->n_clients = n_clients; 01371 barrier->n_waiting = 0; 01372 barrier->phase = 0; 01373 barrier->sum = 0; 01374 wkf_mutex_init(&barrier->lock); 01375 wkf_cond_init(&barrier->wait_cv); 01376 } 01377 #endif 01378 01379 return barrier; 01380 } 01381 01382 01383 /* When rendering in the CAVE we use a special synchronization */ 01384 /* mode so that shared memory mutexes and condition variables */ 01385 /* will work correctly when accessed from multiple processes. */ 01386 /* Inter-process synchronization involves the kernel to a greater */ 01387 /* degree, so these barriers are substantially more costly to use */ 01388 /* than the ones designed for use within a single-process. */ 01389 int wkf_thread_barrier_init_proc_shared(wkf_barrier_t *barrier, int n_clients) { 01390 #ifdef WKFTHREADS 01391 #ifdef USEPOSIXTHREADS 01392 if (barrier != NULL) { 01393 barrier->n_clients = n_clients; 01394 barrier->n_waiting = 0; 01395 barrier->phase = 0; 01396 barrier->sum = 0; 01397 01398 pthread_mutexattr_t mattr; 01399 pthread_condattr_t cattr; 01400 01401 printf("Setting barriers to have system scope...\n"); 01402 01403 pthread_mutexattr_init(&mattr); 01404 if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) { 01405 printf("WARNING: could not set mutex to process shared scope\n"); 01406 } 01407 01408 pthread_condattr_init(&cattr); 01409 if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED) != 0) { 01410 printf("WARNING: could not set mutex to process shared scope\n"); 01411 } 01412 01413 pthread_mutex_init(&barrier->lock, &mattr); 01414 pthread_cond_init(&barrier->wait_cv, &cattr); 01415 01416 pthread_condattr_destroy(&cattr); 01417 pthread_mutexattr_destroy(&mattr); 01418 } 01419 #endif 01420 #endif 01421 01422 return 0; 01423 } 01424 01425 01426 void wkf_thread_barrier_destroy(wkf_barrier_t *barrier) { 01427 #ifdef WKFTHREADS 01428 wkf_mutex_destroy(&barrier->lock); 01429 wkf_cond_destroy(&barrier->wait_cv); 01430 #endif 01431 free(barrier); 01432 } 01433 01434 01435 int wkf_thread_barrier(wkf_barrier_t *barrier, int increment) { 01436 #ifdef WKFTHREADS 01437 int my_phase; 01438 int my_result; 01439 01440 wkf_mutex_lock(&barrier->lock); 01441 my_phase = barrier->phase; 01442 barrier->sum += increment; 01443 barrier->n_waiting++; 01444 01445 if (barrier->n_waiting == barrier->n_clients) { 01446 barrier->result = barrier->sum; 01447 barrier->sum = 0; 01448 barrier->n_waiting = 0; 01449 barrier->phase = 1 - my_phase; 01450 wkf_cond_broadcast(&barrier->wait_cv); 01451 } 01452 01453 while (barrier->phase == my_phase) { 01454 wkf_cond_wait(&barrier->wait_cv, &barrier->lock); 01455 } 01456 01457 my_result = barrier->result; 01458 01459 wkf_mutex_unlock(&barrier->lock); 01460 01461 return my_result; 01462 #else 01463 return 0; 01464 #endif 01465 } 01466 01467 01468 /* 01469 * Barriers used for sleepable thread pools 01470 */ 01471 /* symmetric run barrier for use within a single process */ 01472 int wkf_thread_run_barrier_init(wkf_run_barrier_t *barrier, int n_clients) { 01473 #ifdef WKFTHREADS 01474 if (barrier != NULL) { 01475 barrier->n_clients = n_clients; 01476 barrier->n_waiting = 0; 01477 barrier->phase = 0; 01478 barrier->fctn = NULL; 01479 01480 wkf_mutex_init(&barrier->lock); 01481 wkf_cond_init(&barrier->wait_cv); 01482 } 01483 #endif 01484 01485 return 0; 01486 } 01487 01488 void wkf_thread_run_barrier_destroy(wkf_run_barrier_t *barrier) { 01489 #ifdef WKFTHREADS 01490 wkf_mutex_destroy(&barrier->lock); 01491 wkf_cond_destroy(&barrier->wait_cv); 01492 #endif 01493 } 01494 01495 01500 void * (*wkf_thread_run_barrier(wkf_run_barrier_t *barrier, 01501 void * fctn(void*), 01502 void * parms, 01503 void **rsltparms))(void *) { 01504 #if defined(WKFTHREADS) 01505 int my_phase; 01506 void * (*my_result)(void*); 01507 01508 wkf_mutex_lock(&barrier->lock); 01509 my_phase = barrier->phase; 01510 if (fctn != NULL) 01511 barrier->fctn = fctn; 01512 if (parms != NULL) 01513 barrier->parms = parms; 01514 barrier->n_waiting++; 01515 01516 if (barrier->n_waiting == barrier->n_clients) { 01517 barrier->rslt = barrier->fctn; 01518 barrier->rsltparms = barrier->parms; 01519 barrier->fctn = NULL; 01520 barrier->parms = NULL; 01521 barrier->n_waiting = 0; 01522 barrier->phase = 1 - my_phase; 01523 wkf_cond_broadcast(&barrier->wait_cv); 01524 } 01525 01526 while (barrier->phase == my_phase) { 01527 wkf_cond_wait(&barrier->wait_cv, &barrier->lock); 01528 } 01529 01530 my_result = barrier->rslt; 01531 if (rsltparms != NULL) 01532 *rsltparms = barrier->rsltparms; 01533 01534 wkf_mutex_unlock(&barrier->lock); 01535 #else 01536 void * (*my_result)(void*) = fctn; 01537 if (rsltparms != NULL) 01538 *rsltparms = parms; 01539 #endif 01540 01541 return my_result; 01542 } 01543 01544 01546 int wkf_thread_run_barrier_poll(wkf_run_barrier_t *barrier) { 01547 int rc=0; 01548 #if defined(WKFTHREADS) 01549 wkf_mutex_lock(&barrier->lock); 01550 if (barrier->n_waiting == (barrier->n_clients-1)) { 01551 rc=1; 01552 } 01553 wkf_mutex_unlock(&barrier->lock); 01554 #endif 01555 return rc; 01556 } 01557 01558 01559 /* 01560 * task tile stack 01561 */ 01562 int wkf_tilestack_init(wkf_tilestack_t *s, int size) { 01563 if (s == NULL) 01564 return -1; 01565 01566 #if defined(WKFTHREADS) 01567 wkf_mutex_init(&s->mtx); 01568 #endif 01569 01570 s->growthrate = 512; 01571 s->top = -1; 01572 01573 if (size > 0) { 01574 s->size = size; 01575 s->s = (wkf_tasktile_t *) malloc(s->size * sizeof(wkf_tasktile_t)); 01576 } else { 01577 s->size = 0; 01578 s->s = NULL; 01579 } 01580 01581 return 0; 01582 } 01583 01584 01585 void wkf_tilestack_destroy(wkf_tilestack_t *s) { 01586 #if defined(WKFTHREADS) 01587 wkf_mutex_destroy(&s->mtx); 01588 #endif 01589 free(s->s); 01590 s->s = NULL; /* prevent access after free */ 01591 } 01592 01593 01594 int wkf_tilestack_compact(wkf_tilestack_t *s) { 01595 #if defined(WKFTHREADS) 01596 wkf_mutex_lock(&s->mtx); 01597 #endif 01598 if (s->size > (s->top + 1)) { 01599 int newsize = s->top + 1; 01600 wkf_tasktile_t *tmp = (wkf_tasktile_t *) realloc(s->s, newsize * sizeof(wkf_tasktile_t)); 01601 if (tmp == NULL) { 01602 #if defined(WKFTHREADS) 01603 wkf_mutex_unlock(&s->mtx); 01604 #endif 01605 return -1; /* out of space! */ 01606 } 01607 s->s = tmp; 01608 s->size = newsize; 01609 } 01610 #if defined(WKFTHREADS) 01611 wkf_mutex_unlock(&s->mtx); 01612 #endif 01613 01614 return 0; 01615 } 01616 01617 01618 int wkf_tilestack_push(wkf_tilestack_t *s, const wkf_tasktile_t *t) { 01619 #if defined(WKFTHREADS) 01620 wkf_mutex_lock(&s->mtx); 01621 #endif 01622 s->top++; 01623 if (s->top >= s->size) { 01624 int newsize = s->size + s->growthrate; 01625 wkf_tasktile_t *tmp = (wkf_tasktile_t *) realloc(s->s, newsize * sizeof(wkf_tasktile_t)); 01626 if (tmp == NULL) { 01627 s->top--; 01628 #if defined(WKFTHREADS) 01629 wkf_mutex_unlock(&s->mtx); 01630 #endif 01631 return -1; /* out of space! */ 01632 } 01633 s->s = tmp; 01634 s->size = newsize; 01635 } 01636 01637 s->s[s->top] = *t; /* push onto the stack */ 01638 01639 #if defined(WKFTHREADS) 01640 wkf_mutex_unlock(&s->mtx); 01641 #endif 01642 01643 return 0; 01644 } 01645 01646 01647 int wkf_tilestack_pop(wkf_tilestack_t *s, wkf_tasktile_t *t) { 01648 #if defined(WKFTHREADS) 01649 wkf_mutex_lock(&s->mtx); 01650 #endif 01651 01652 if (s->top < 0) { 01653 #if defined(WKFTHREADS) 01654 wkf_mutex_unlock(&s->mtx); 01655 #endif 01656 return WKF_TILESTACK_EMPTY; /* empty stack */ 01657 } 01658 01659 *t = s->s[s->top]; 01660 s->top--; 01661 01662 #if defined(WKFTHREADS) 01663 wkf_mutex_unlock(&s->mtx); 01664 #endif 01665 01666 return 0; 01667 } 01668 01669 01670 int wkf_tilestack_popall(wkf_tilestack_t *s) { 01671 #if defined(WKFTHREADS) 01672 wkf_mutex_lock(&s->mtx); 01673 #endif 01674 01675 s->top = -1; 01676 01677 #if defined(WKFTHREADS) 01678 wkf_mutex_unlock(&s->mtx); 01679 #endif 01680 01681 return 0; 01682 } 01683 01684 01685 int wkf_tilestack_empty(wkf_tilestack_t *s) { 01686 #if defined(WKFTHREADS) 01687 wkf_mutex_lock(&s->mtx); 01688 #endif 01689 01690 if (s->top < 0) { 01691 #if defined(WKFTHREADS) 01692 wkf_mutex_unlock(&s->mtx); 01693 #endif 01694 return 1; 01695 } 01696 01697 #if defined(WKFTHREADS) 01698 wkf_mutex_unlock(&s->mtx); 01699 #endif 01700 01701 return 0; 01702 } 01703 01704 01705 /* 01706 * shared iterators 01707 */ 01708 01710 int wkf_shared_iterator_init(wkf_shared_iterator_t *it) { 01711 memset(it, 0, sizeof(wkf_shared_iterator_t)); 01712 #if defined(WKFTHREADS) 01713 wkf_mutex_init(&it->mtx); 01714 #endif 01715 return 0; 01716 } 01717 01718 01720 int wkf_shared_iterator_destroy(wkf_shared_iterator_t *it) { 01721 #if defined(WKFTHREADS) 01722 wkf_mutex_destroy(&it->mtx); 01723 #endif 01724 return 0; 01725 } 01726 01727 01729 int wkf_shared_iterator_set(wkf_shared_iterator_t *it, 01730 wkf_tasktile_t *tile) { 01731 #if defined(WKFTHREADS) 01732 wkf_mutex_lock(&it->mtx); 01733 #endif 01734 it->start = tile->start; 01735 it->current = tile->start; 01736 it->end = tile->end; 01737 it->fatalerror = 0; 01738 #if defined(WKFTHREADS) 01739 wkf_mutex_unlock(&it->mtx); 01740 #endif 01741 return 0; 01742 } 01743 01744 01746 int wkf_shared_iterator_next_tile(wkf_shared_iterator_t *it, int reqsize, 01747 wkf_tasktile_t *tile) { 01748 int rc=WKF_SCHED_CONTINUE; 01749 01750 #if defined(WKFTHREADS) 01751 wkf_mutex_spin_lock(&it->mtx); 01752 #endif 01753 if (!it->fatalerror) { 01754 tile->start=it->current; /* set start to the current work unit */ 01755 it->current+=reqsize; /* increment by the requested tile size */ 01756 tile->end=it->current; /* set the (exclusive) endpoint */ 01757 01758 /* if start is beyond the last work unit, we're done */ 01759 if (tile->start >= it->end) { 01760 tile->start=0; 01761 tile->end=0; 01762 rc = WKF_SCHED_DONE; 01763 } 01764 01765 /* if the endpoint (exclusive) for the requested tile size */ 01766 /* is beyond the last work unit, roll it back as needed */ 01767 if (tile->end > it->end) { 01768 tile->end = it->end; 01769 } 01770 } else { 01771 rc = WKF_SCHED_DONE; 01772 } 01773 #if defined(WKFTHREADS) 01774 wkf_mutex_unlock(&it->mtx); 01775 #endif 01776 01777 return rc; 01778 } 01779 01780 01782 int wkf_shared_iterator_setfatalerror(wkf_shared_iterator_t *it) { 01783 #if defined(WKFTHREADS) 01784 wkf_mutex_spin_lock(&it->mtx); 01785 #endif 01786 it->fatalerror=1; 01787 #if defined(WKFTHREADS) 01788 wkf_mutex_unlock(&it->mtx); 01789 #endif 01790 return 0; 01791 } 01792 01793 01795 int wkf_shared_iterator_getfatalerror(wkf_shared_iterator_t *it) { 01796 int rc=0; 01797 #if defined(WKFTHREADS) 01798 wkf_mutex_lock(&it->mtx); 01799 #endif 01800 if (it->fatalerror) 01801 rc = -1; 01802 #if defined(WKFTHREADS) 01803 wkf_mutex_unlock(&it->mtx); 01804 #endif 01805 return rc; 01806 } 01807 01808 01809 #if defined(WKFTHREADS) 01810 /* 01811 * Thread pool. 01812 */ 01813 static void * wkf_threadpool_workerproc(void *voidparms) { 01814 void *(*fctn)(void*); 01815 wkf_threadpool_workerdata_t *workerdata = (wkf_threadpool_workerdata_t *) voidparms; 01816 wkf_threadpool_t *thrpool = (wkf_threadpool_t *) workerdata->thrpool; 01817 01818 while ((fctn = wkf_thread_run_barrier(&thrpool->runbar, NULL, NULL, &workerdata->parms)) != NULL) { 01819 (*fctn)(workerdata); 01820 } 01821 01822 return NULL; 01823 } 01824 01825 01826 static void * wkf_threadpool_workersync(void *voidparms) { 01827 return NULL; 01828 } 01829 #endif 01830 01831 01832 wkf_threadpool_t * wkf_threadpool_create(int workercount, int *devlist) { 01833 int i; 01834 wkf_threadpool_t *thrpool = NULL; 01835 thrpool = (wkf_threadpool_t *) malloc(sizeof(wkf_threadpool_t)); 01836 if (thrpool == NULL) 01837 return NULL; 01838 01839 memset(thrpool, 0, sizeof(wkf_threadpool_t)); 01840 01841 #if !defined(WKFTHREADS) 01842 workercount=1; 01843 #endif 01844 01845 /* if caller provides a device list, use it, otherwise we assume */ 01846 /* all workers are CPU cores */ 01847 thrpool->devlist = (int *) malloc(sizeof(int) * workercount); 01848 if (devlist == NULL) { 01849 for (i=0; i<workercount; i++) 01850 thrpool->devlist[i] = -1; /* mark as a CPU core */ 01851 } else { 01852 memcpy(thrpool->devlist, devlist, sizeof(int) * workercount); 01853 } 01854 01855 /* initialize shared iterator */ 01856 wkf_shared_iterator_init(&thrpool->iter); 01857 01858 /* initialize tile stack for error handling */ 01859 wkf_tilestack_init(&thrpool->errorstack, 64); 01860 01861 /* create a run barrier with N+1 threads: N workers, 1 master */ 01862 thrpool->workercount = workercount; 01863 wkf_thread_run_barrier_init(&thrpool->runbar, workercount+1); 01864 01865 /* allocate and initialize thread pool */ 01866 thrpool->threads = (wkf_thread_t *) malloc(sizeof(wkf_thread_t) * workercount); 01867 thrpool->workerdata = (wkf_threadpool_workerdata_t *) malloc(sizeof(wkf_threadpool_workerdata_t) * workercount); 01868 memset(thrpool->workerdata, 0, sizeof(wkf_threadpool_workerdata_t) * workercount); 01869 01870 /* setup per-worker data */ 01871 for (i=0; i<workercount; i++) { 01872 thrpool->workerdata[i].iter=&thrpool->iter; 01873 thrpool->workerdata[i].errorstack=&thrpool->errorstack; 01874 thrpool->workerdata[i].threadid=i; 01875 thrpool->workerdata[i].threadcount=workercount; 01876 thrpool->workerdata[i].devid=thrpool->devlist[i]; 01877 thrpool->workerdata[i].devspeed=1.0f; /* must be reset by dev setup code */ 01878 thrpool->workerdata[i].thrpool=thrpool; 01879 } 01880 01881 #if defined(WKFTHREADS) 01882 /* launch thread pool */ 01883 for (i=0; i<workercount; i++) { 01884 wkf_thread_create(&thrpool->threads[i], wkf_threadpool_workerproc, &thrpool->workerdata[i]); 01885 } 01886 #endif 01887 01888 return thrpool; 01889 } 01890 01891 01892 int wkf_threadpool_launch(wkf_threadpool_t *thrpool, 01893 void *fctn(void *), void *parms, int blocking) { 01894 if (thrpool == NULL) 01895 return -1; 01896 01897 #if defined(WKFTHREADS) 01898 /* wake sleeping threads to run fctn(parms) */ 01899 wkf_thread_run_barrier(&thrpool->runbar, fctn, parms, NULL); 01900 if (blocking) 01901 wkf_thread_run_barrier(&thrpool->runbar, wkf_threadpool_workersync, NULL, NULL); 01902 #else 01903 thrpool->workerdata[0].parms = parms; 01904 (*fctn)(&thrpool->workerdata[0]); 01905 #endif 01906 return 0; 01907 } 01908 01909 01910 int wkf_threadpool_wait(wkf_threadpool_t *thrpool) { 01911 #if defined(WKFTHREADS) 01912 wkf_thread_run_barrier(&thrpool->runbar, wkf_threadpool_workersync, NULL, NULL); 01913 #endif 01914 return 0; 01915 } 01916 01917 01918 int wkf_threadpool_poll(wkf_threadpool_t *thrpool) { 01919 #if defined(WKFTHREADS) 01920 return wkf_thread_run_barrier_poll(&thrpool->runbar); 01921 #else 01922 return 1; 01923 #endif 01924 } 01925 01926 01927 int wkf_threadpool_destroy(wkf_threadpool_t *thrpool) { 01928 #if defined(WKFTHREADS) 01929 int i; 01930 #endif 01931 01932 /* wake threads and tell them to shutdown */ 01933 wkf_thread_run_barrier(&thrpool->runbar, NULL, NULL, NULL); 01934 01935 #if defined(WKFTHREADS) 01936 /* join the pool of worker threads */ 01937 for (i=0; i<thrpool->workercount; i++) { 01938 wkf_thread_join(thrpool->threads[i], NULL); 01939 } 01940 #endif 01941 01942 /* destroy the thread barrier */ 01943 wkf_thread_run_barrier_destroy(&thrpool->runbar); 01944 01945 /* destroy the shared iterator */ 01946 wkf_shared_iterator_destroy(&thrpool->iter); 01947 01948 /* destroy tile stack for error handling */ 01949 wkf_tilestack_destroy(&thrpool->errorstack); 01950 01951 free(thrpool->devlist); 01952 free(thrpool->threads); 01953 free(thrpool->workerdata); 01954 free(thrpool); 01955 01956 return 0; 01957 } 01958 01959 01961 int wkf_threadpool_get_workercount(wkf_threadpool_t *thrpool) { 01962 return thrpool->workercount; 01963 } 01964 01965 01967 int wkf_threadpool_worker_getid(void *voiddata, int *threadid, int *threadcount) { 01968 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata; 01969 if (threadid != NULL) 01970 *threadid = worker->threadid; 01971 01972 if (threadcount != NULL) 01973 *threadcount = worker->threadcount; 01974 01975 return 0; 01976 } 01977 01978 01980 int wkf_threadpool_worker_getdevid(void *voiddata, int *devid) { 01981 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata; 01982 if (devid != NULL) 01983 *devid = worker->devid; 01984 01985 return 0; 01986 } 01987 01988 01995 int wkf_threadpool_worker_setdevspeed(void *voiddata, float speed) { 01996 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata; 01997 worker->devspeed = speed; 01998 return 0; 01999 } 02000 02001 02006 int wkf_threadpool_worker_getdevspeed(void *voiddata, float *speed) { 02007 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata; 02008 if (speed != NULL) 02009 *speed = worker->devspeed; 02010 return 0; 02011 } 02012 02013 02018 int wkf_threadpool_worker_devscaletile(void *voiddata, int *tilesize) { 02019 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata; 02020 if (tilesize != NULL) { 02021 int scaledtilesize; 02022 scaledtilesize = (int) (worker->devspeed * ((float) (*tilesize))); 02023 if (scaledtilesize < 1) 02024 scaledtilesize = 1; 02025 02026 *tilesize = scaledtilesize; 02027 } 02028 02029 return 0; 02030 } 02031 02032 02034 int wkf_threadpool_worker_getdata(void *voiddata, void **clientdata) { 02035 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voiddata; 02036 if (clientdata != NULL) 02037 *clientdata = worker->parms; 02038 02039 return 0; 02040 } 02041 02042 02044 int wkf_threadpool_sched_dynamic(wkf_threadpool_t *thrpool, wkf_tasktile_t *tile) { 02045 if (thrpool == NULL) 02046 return -1; 02047 return wkf_shared_iterator_set(&thrpool->iter, tile); 02048 } 02049 02050 02052 int wkf_threadpool_next_tile(void *voidparms, int reqsize, 02053 wkf_tasktile_t *tile) { 02054 int rc; 02055 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voidparms; 02056 rc = wkf_shared_iterator_next_tile(worker->iter, reqsize, tile); 02057 if (rc == WKF_SCHED_DONE) { 02058 /* if the error stack is empty, then we're done, otherwise pop */ 02059 /* a tile off of the error stack and retry it */ 02060 if (wkf_tilestack_pop(worker->errorstack, tile) != WKF_TILESTACK_EMPTY) 02061 return WKF_SCHED_CONTINUE; 02062 } 02063 02064 return rc; 02065 } 02066 02067 02072 int wkf_threadpool_tile_failed(void *voidparms, wkf_tasktile_t *tile) { 02073 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voidparms; 02074 return wkf_tilestack_push(worker->errorstack, tile); 02075 } 02076 02077 02078 /* worker thread calls this to indicate that an unrecoverable error occured */ 02079 int wkf_threadpool_setfatalerror(void *voidparms) { 02080 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voidparms; 02081 wkf_shared_iterator_setfatalerror(worker->iter); 02082 return 0; 02083 } 02084 02085 02086 /* worker thread calls this to indicate that an unrecoverable error occured */ 02087 int wkf_threadpool_getfatalerror(void *voidparms) { 02088 wkf_threadpool_workerdata_t *worker = (wkf_threadpool_workerdata_t *) voidparms; 02089 /* query error status for return to caller */ 02090 return wkf_shared_iterator_getfatalerror(worker->iter); 02091 } 02092 02093 02094 /* launch up to numprocs threads using shared iterator as a load balancer */ 02095 int wkf_threadlaunch(int numprocs, void *clientdata, void * fctn(void *), 02096 wkf_tasktile_t *tile) { 02097 wkf_shared_iterator_t iter; 02098 wkf_threadlaunch_t *parms=NULL; 02099 wkf_thread_t * threads=NULL; 02100 int i, rc; 02101 02102 /* XXX have to ponder what the right thing to do is here */ 02103 #if !defined(WKFTHREADS) 02104 numprocs=1; 02105 #endif 02106 02107 /* initialize shared iterator and set the iteration and range */ 02108 wkf_shared_iterator_init(&iter); 02109 if (wkf_shared_iterator_set(&iter, tile)) 02110 return -1; 02111 02112 /* allocate array of threads */ 02113 threads = (wkf_thread_t *) calloc(numprocs * sizeof(wkf_thread_t), 1); 02114 if (threads == NULL) 02115 return -1; 02116 02117 /* allocate and initialize array of thread parameters */ 02118 parms = (wkf_threadlaunch_t *) malloc(numprocs * sizeof(wkf_threadlaunch_t)); 02119 if (parms == NULL) { 02120 free(threads); 02121 return -1; 02122 } 02123 for (i=0; i<numprocs; i++) { 02124 parms[i].iter = &iter; 02125 parms[i].threadid = i; 02126 parms[i].threadcount = numprocs; 02127 parms[i].clientdata = clientdata; 02128 } 02129 02130 #if defined(WKFTHREADS) 02131 if (numprocs == 1) { 02132 /* XXX we special-case the single worker thread */ 02133 /* scenario because this greatly reduces the */ 02134 /* GPU kernel launch overhead since a new */ 02135 /* contexts doesn't have to be created, and */ 02136 /* in the simplest case with a single-GPU we */ 02137 /* will just be using the same device anyway */ 02138 /* Ideally we shouldn't need to do this.... */ 02139 /* single thread does all of the work */ 02140 fctn((void *) &parms[0]); 02141 } else { 02142 /* spawn child threads to do the work */ 02143 for (i=0; i<numprocs; i++) { 02144 wkf_thread_create(&threads[i], fctn, &parms[i]); 02145 } 02146 02147 /* join the threads after work is done */ 02148 for (i=0; i<numprocs; i++) { 02149 wkf_thread_join(threads[i], NULL); 02150 } 02151 } 02152 #else 02153 /* single thread does all of the work */ 02154 fctn((void *) &parms[0]); 02155 #endif 02156 02157 /* free threads/parms */ 02158 free(parms); 02159 free(threads); 02160 02161 /* query error status for return to caller */ 02162 rc=wkf_shared_iterator_getfatalerror(&iter); 02163 02164 /* destroy the shared iterator */ 02165 wkf_shared_iterator_destroy(&iter); 02166 02167 return rc; 02168 } 02169 02170 02172 int wkf_threadlaunch_getid(void *voidparms, int *threadid, int *threadcount) { 02173 wkf_threadlaunch_t *worker = (wkf_threadlaunch_t *) voidparms; 02174 if (threadid != NULL) 02175 *threadid = worker->threadid; 02176 02177 if (threadcount != NULL) 02178 *threadcount = worker->threadcount; 02179 02180 return 0; 02181 } 02182 02183 02185 int wkf_threadlaunch_getdata(void *voidparms, void **clientdata) { 02186 wkf_threadlaunch_t *worker = (wkf_threadlaunch_t *) voidparms; 02187 if (clientdata != NULL) 02188 *clientdata = worker->clientdata; 02189 02190 return 0; 02191 } 02192 02193 02195 int wkf_threadlaunch_next_tile(void *voidparms, int reqsize, 02196 wkf_tasktile_t *tile) { 02197 wkf_threadlaunch_t *worker = (wkf_threadlaunch_t *) voidparms; 02198 return wkf_shared_iterator_next_tile(worker->iter, reqsize, tile); 02199 } 02200 02201 02203 int wkf_threadlaunch_setfatalerror(void *voidparms) { 02204 wkf_threadlaunch_t *worker = (wkf_threadlaunch_t *) voidparms; 02205 return wkf_shared_iterator_setfatalerror(worker->iter); 02206 } 02207 02208 02209 #ifdef __cplusplus 02210 } 02211 #endif 02212 02213