// shm-test.cpp : Defines the entry point for the console application. // #include "stdafx.h" static volatile unsigned running = 1; typedef __int64 int64_t; typedef int int32_t; struct timeStamp { //int64_t filler[4]; // Locate each timestamp in a separate cache line! int64_t clockTimeStampSec; int64_t receiveTimeStampSec; int32_t clockTimeStampUSec; int32_t receiveTimeStampUSec; int32_t leap; int32_t precision; }; #define TIMESTAMP_BUFFER_MIN_SIZE 2 #define TIMESTAMP_BUFFER_MAX_SIZE 64 #define TIMESTAMP_BUFFER_DEFAULT_SIZE 4 #define MIN_THREADS 2 #define MAX_THREADS 256 #define DEFAULT_THREADS 4 #define DEFAULT_TIME 60 #define GPSD_NTP_MARKER 0x50544E2D44535047 struct shmTime { int64_t marker; /* 0x50544E2D44535047 == GPSD-NTP in LE */ int32_t mode; /* 4 + TIMESTAMP_BUFFER_SIZE * 256 */ int32_t count; int32_t nsamples; int32_t valid; int64_t filler; // Make the header part 32 bytes struct timeStamp ts[TIMESTAMP_BUFFER_MAX_SIZE]; }; volatile struct shmTime *shm = NULL; int32_t timestamp_buffer_size = TIMESTAMP_BUFFER_DEFAULT_SIZE; int nrOfThreads = DEFAULT_THREADS; int timeToRun = DEFAULT_TIME; struct threadReport { int64_t count; int64_t counter_update; int64_t counter_retry; int64_t timestamps_inconsistent; }; DWORD WINAPI writer(LPVOID par) { int64_t count = 0; int32_t timestamp_buffer_mask = timestamp_buffer_size-1; int32_t timestamp_buffer_limit = timestamp_buffer_mask - 1; struct threadReport *tr = (struct threadReport *) par; memset((void *) shm, 0, sizeof(struct shmTime)- (sizeof(struct timeStamp)*(TIMESTAMP_BUFFER_MAX_SIZE - timestamp_buffer_size))); shm->mode = 4 + timestamp_buffer_mask * 256; shm->valid = 1; _ReadWriteBarrier(); shm->marker = GPSD_NTP_MARKER; do { int32_t counter = shm->count; int32_t c = (counter + 1) & timestamp_buffer_mask; int64_t qpc; QueryPerformanceCounter((LARGE_INTEGER *) &qpc); shm->ts[c].clockTimeStampSec = qpc; shm->ts[c].receiveTimeStampSec = qpc; shm->ts[c].clockTimeStampUSec = counter; shm->ts[c].receiveTimeStampUSec = counter; shm->ts[c].precision = -19; shm->ts[c].leap = 0; _ReadWriteBarrier(); shm->count++; count++; } while (running); tr->count = count; return 0; } DWORD WINAPI reader(LPVOID par) { int64_t count = 0, cs, rs; int32_t cus, rus; int32_t counter, ncnt; int64_t lcounter_update = 0, lcounter_retry = 0, ltimestamps_inconsistent = 0; int32_t timestamp_buffer_mask; int32_t timestamp_buffer_limit; struct threadReport *tr = (struct threadReport *) par; // Wait until the writer have initialized everything... do { } while (shm->marker != GPSD_NTP_MARKER || shm->valid == 0 || shm->mode == 0); if ((shm->mode & 255) < 4) return 1; // Wrong mode! timestamp_buffer_mask = (shm->mode >> 8); timestamp_buffer_limit = timestamp_buffer_mask - 1; do { int32_t c; retry: counter = shm->count; //_ReadWriteBarrier(); c = counter & timestamp_buffer_mask; cs = shm->ts[c].clockTimeStampSec; rs = shm->ts[c].receiveTimeStampSec; cus = shm->ts[c].clockTimeStampUSec; rus = shm->ts[c].receiveTimeStampUSec; //_ReadWriteBarrier(); ncnt = shm->count; if (ncnt != counter) { lcounter_update++; if ((ncnt - counter) >= timestamp_buffer_limit) { lcounter_retry++; goto retry; } ncnt = counter; } if (cs != rs || cus != rus) ltimestamps_inconsistent++; count++; } while (running); if (count == 0) count = 1; tr->count = count; tr->counter_update = lcounter_update; tr->counter_retry = lcounter_retry; tr->timestamps_inconsistent = ltimestamps_inconsistent; return 0; } volatile struct threadReport treport[MAX_THREADS]; int syntax(void) { fprintf(stderr, "Usage shm-test [options]\n"\ " -b buffer_slots (default 4, range 2-64 in powers of two)\n"\ " -n nr_of_threads (default 4, range 2-256)\n"\ " -t seconds_to_run (default 60)\n"); return 1; } int nval(_TCHAR *s) { int n = 0; _TCHAR c; while ((c = *s++) != '\0') { if ((c >= '0') && (c <= '9')) { n = n*10 + (c - '0'); } } return n; } int _tmain(int argc, _TCHAR* argv[]) { DWORD threadIDs[MAX_THREADS]; HANDLE h[MAX_THREADS]; DWORD_PTR pthr; DWORD_PTR wpa; void * shm_base; int i,a; _TCHAR *arg; int64_t c = -1; // Check any cmdline parms: for (a = 1; a < argc; a++) { arg = argv[a]; if (*arg == '-') { a++; arg++; if (*arg == 'b') { // buffer slots timestamp_buffer_size = nval(argv[a]); if (timestamp_buffer_size < TIMESTAMP_BUFFER_MIN_SIZE) timestamp_buffer_size = TIMESTAMP_BUFFER_MIN_SIZE; if (timestamp_buffer_size > TIMESTAMP_BUFFER_MAX_SIZE) timestamp_buffer_size = TIMESTAMP_BUFFER_MAX_SIZE; if (timestamp_buffer_size & (timestamp_buffer_size-1)) timestamp_buffer_size = TIMESTAMP_BUFFER_DEFAULT_SIZE; } else if (*arg == 'n') { // total # of threads to run nrOfThreads = nval(argv[a]); if (nrOfThreads < MIN_THREADS) nrOfThreads = MIN_THREADS; if (nrOfThreads > MAX_THREADS) nrOfThreads = MAX_THREADS; } else if (*arg == 't') { // seconds to run timeToRun = nval(argv[a]); } else return syntax(); } else return syntax(); } printf("Testing with %d buffer slots, %d threads for %d seconds\n", timestamp_buffer_size, nrOfThreads, timeToRun); memset((void *) treport, 0, sizeof(treport)); // Setup the shm record: shm_base = malloc(sizeof(struct shmTime)+63); (volatile struct shmTime *) shm = (volatile struct shmTime *) (((DWORD_PTR) shm_base + 63)& ~63); h[0] = CreateThread(NULL, 0, &writer, (LPVOID) &treport[0], 0, &threadIDs[0]); // Pick the first core for the writer thread: pthr = 1; wpa = SetThreadAffinityMask(h[0], pthr); printf("Writer affinity = %x, old = %x\n", pthr, wpa); // Reader threads should stay off the writer core: pthr = wpa & (DWORD_PTR) -2; // All cores except the first! for (i = 1; i < nrOfThreads; i++) { h[i] = CreateThread(NULL, 0, &reader, (LPVOID) &treport[i], 0, &threadIDs[i]); wpa = SetThreadAffinityMask(h[i], pthr); printf("Reader %d affinity = %x, old = %x\n", i, pthr, wpa); } for (i = 0; i < timeToRun; i++) { fprintf(stderr, "%d/%d\r", i, timeToRun); Sleep(1000); } running = 0; printf("Done waiting %d seconds\n", timeToRun); // Wait for up to 1 second, until all threads have had a chance to update their reports: i = 10; do { int t; c = -1; Sleep(100); for (t = 0; t < nrOfThreads; t++) c &= treport[t].count; i--; } while (c == 0 && i); printf("\nAll threads done! (i = %d)\n", i); printf("Writer:\n%lld iterations, %10.5g iterations/second\n", treport[0].count, 1.0* treport[0].count / timeToRun); treport[0].count = 0; for (i = 1; i < nrOfThreads; i++) { printf("Thread %d:\n%lld iterations, %lld updating %lld retries %lld errors\n", i, treport[i].count, treport[i].counter_update, treport[i].counter_retry, treport[i].timestamps_inconsistent); // Use the writer slot to accumulate all the reader counts: treport[0].count += treport[i].count; treport[0].counter_update += treport[i].counter_update; treport[0].counter_retry += treport[i].counter_retry; treport[0].timestamps_inconsistent += treport[i].timestamps_inconsistent; } printf("Reader totals:\n%lld iterations, %lld updating %lld retries %lld errors\n", treport[0].count, treport[0].counter_update, treport[0].counter_retry, treport[0].timestamps_inconsistent); printf("Reader totals:\n%10.5g iterations/second, %5.2f%% updating %5.2f%% retries %5.3f%% errors\n", 1.0 * treport[0].count/ timeToRun, 100.0 * treport[0].counter_update / treport[0].count, 100.0 * treport[0].counter_retry / treport[0].count, 100.0 * treport[0].timestamps_inconsistent / treport[0].count); return 0; }