/* * Copyright (c) Meta Platforms, Inc. and affiliates. * All rights reserved. * * This source code is licensed under both the BSD-style license (found in the * LICENSE file in the root directory of this source tree) and the GPLv2 (found * in the COPYING file in the root directory of this source tree). * You may select, at your option, one of the above-listed licenses. */ #include "platform.h" #include <stdio.h> /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */ #include <stdlib.h> /* malloc, free */ #include <assert.h> #include <errno.h> /* errno */ #if defined (_MSC_VER) # include <sys/stat.h> # include <io.h> #endif #include "fileio_asyncio.h" #include "fileio_common.h" /* ********************************************************************** * Sparse write ************************************************************************/ /** AIO_fwriteSparse() : * @return : storedSkips, * argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */ static unsigned AIO_fwriteSparse(FILE* file, const void* buffer, size_t bufferSize, const FIO_prefs_t* const prefs, unsigned storedSkips) { const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */ size_t bufferSizeT = bufferSize / sizeof(size_t); const size_t* const bufferTEnd = bufferT + bufferSizeT; const size_t* ptrT = bufferT; static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */ if (prefs->testMode) return 0; /* do not output anything in test mode */ if (!prefs->sparseFileSupport) { /* normal write */ size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file); if (sizeCheck != bufferSize) EXM_THROW(70, "Write error : cannot write block : %s", strerror(errno)); return 0; } /* avoid int overflow */ if (storedSkips > 1 GB) { if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0) EXM_THROW(91, "1 GB skip error (sparse file support)"); storedSkips -= 1 GB; } while (ptrT < bufferTEnd) { size_t nb0T; /* adjust last segment if < 32 KB */ size_t seg0SizeT = segmentSizeT; if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT; bufferSizeT -= seg0SizeT; /* count leading zeroes */ for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ; storedSkips += (unsigned)(nb0T * sizeof(size_t)); if (nb0T != seg0SizeT) { /* not all 0s */ size_t const nbNon0ST = seg0SizeT - nb0T; /* skip leading zeros */ if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) EXM_THROW(92, "Sparse skip error ; try --no-sparse"); storedSkips = 0; /* write the rest */ if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST) EXM_THROW(93, "Write error : cannot write block : %s", strerror(errno)); } ptrT += seg0SizeT; } { static size_t const maskT = sizeof(size_t)-1; if (bufferSize & maskT) { /* size not multiple of sizeof(size_t) : implies end of block */ const char* const restStart = (const char*)bufferTEnd; const char* restPtr = restStart; const char* const restEnd = (const char*)buffer + bufferSize; assert(restEnd > restStart && restEnd < restStart + sizeof(size_t)); for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ; storedSkips += (unsigned) (restPtr - restStart); if (restPtr != restEnd) { /* not all remaining bytes are 0 */ size_t const restSize = (size_t)(restEnd - restPtr); if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) EXM_THROW(92, "Sparse skip error ; try --no-sparse"); if (fwrite(restPtr, 1, restSize, file) != restSize) EXM_THROW(95, "Write error : cannot write end of decoded block : %s", strerror(errno)); storedSkips = 0; } } } return storedSkips; } static void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) { if (prefs->testMode) assert(storedSkips == 0); if (storedSkips>0) { assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */ (void)prefs; /* assert can be disabled, in which case prefs becomes unused */ if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0) EXM_THROW(69, "Final skip error (sparse file support)"); /* last zero must be explicitly written, * so that skipped ones get implicitly translated as zero by FS */ { const char lastZeroByte[1] = { 0 }; if (fwrite(lastZeroByte, 1, 1, file) != 1) EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno)); } } } /* ********************************************************************** * AsyncIO functionality ************************************************************************/ /* AIO_supported: * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */ int AIO_supported(void) { #ifdef ZSTD_MULTITHREAD return 1; #else return 0; #endif } /* *********************************** * Generic IoPool implementation *************************************/ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) { IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t)); void* const buffer = malloc(bufferSize); if(!job || !buffer) EXM_THROW(101, "Allocation error : not enough memory"); job->buffer = buffer; job->bufferSize = bufferSize; job->usedBufferSize = 0; job->file = NULL; job->ctx = ctx; job->offset = 0; return job; } /* AIO_IOPool_createThreadPool: * Creates a thread pool and a mutex for threaded IO pool. * Displays warning if asyncio is requested but MT isn't available. */ static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) { ctx->threadPool = NULL; ctx->threadPoolActive = 0; if(prefs->asyncIO) { if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL)) EXM_THROW(102,"Failed creating ioJobsMutex mutex"); /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */ assert(MAX_IO_JOBS >= 2); ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2); ctx->threadPoolActive = 1; if (!ctx->threadPool) EXM_THROW(104, "Failed creating I/O thread pool"); } } /* AIO_IOPool_init: * Allocates and sets and a new I/O thread pool including its included availableJobs. */ static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) { int i; AIO_IOPool_createThreadPool(ctx, prefs); ctx->prefs = prefs; ctx->poolFunction = poolFunction; ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2; ctx->availableJobsCount = ctx->totalIoJobs; for(i=0; i < ctx->availableJobsCount; i++) { ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize); } ctx->jobBufferSize = bufferSize; ctx->file = NULL; } /* AIO_IOPool_threadPoolActive: * Check if current operation uses thread pool. * Note that in some cases we have a thread pool initialized but choose not to use it. */ static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) { return ctx->threadPool && ctx->threadPoolActive; } /* AIO_IOPool_lockJobsMutex: * Locks the IO jobs mutex if threading is active */ static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) { if(AIO_IOPool_threadPoolActive(ctx)) ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); } /* AIO_IOPool_unlockJobsMutex: * Unlocks the IO jobs mutex if threading is active */ static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) { if(AIO_IOPool_threadPoolActive(ctx)) ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); } /* AIO_IOPool_releaseIoJob: * Releases an acquired job back to the pool. Doesn't execute the job. */ static void AIO_IOPool_releaseIoJob(IOJob_t* job) { IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx; AIO_IOPool_lockJobsMutex(ctx); assert(ctx->availableJobsCount < ctx->totalIoJobs); ctx->availableJobs[ctx->availableJobsCount++] = job; AIO_IOPool_unlockJobsMutex(ctx); } /* AIO_IOPool_join: * Waits for all tasks in the pool to finish executing. */ static void AIO_IOPool_join(IOPoolCtx_t* ctx) { if(AIO_IOPool_threadPoolActive(ctx)) POOL_joinJobs(ctx->threadPool); } /* AIO_IOPool_setThreaded: * Allows (de)activating threaded mode, to be used when the expected overhead * of threading costs more than the expected gains. */ static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) { assert(threaded == 0 || threaded == 1); assert(ctx != NULL); if(ctx->threadPoolActive != threaded) { AIO_IOPool_join(ctx); ctx->threadPoolActive = threaded; } } /* AIO_IOPool_free: * Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) { int i; if(ctx->threadPool) { /* Make sure we finish all tasks and then free the resources */ AIO_IOPool_join(ctx); /* Make sure we are not leaking availableJobs */ assert(ctx->availableJobsCount == ctx->totalIoJobs); POOL_free(ctx->threadPool); ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex); } assert(ctx->file == NULL); for(i=0; i<ctx->availableJobsCount; i++) { IOJob_t* job = (IOJob_t*) ctx->availableJobs[i]; free(job->buffer); free(job); } } /* AIO_IOPool_acquireJob: * Returns an available io job to be used for a future io. */ static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) { IOJob_t *job; assert(ctx->file != NULL || ctx->prefs->testMode); AIO_IOPool_lockJobsMutex(ctx); assert(ctx->availableJobsCount > 0); job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount]; AIO_IOPool_unlockJobsMutex(ctx); job->usedBufferSize = 0; job->file = ctx->file; job->offset = 0; return job; } /* AIO_IOPool_setFile: * Sets the destination file for future files in the pool. * Requires completion of all queued jobs and release of all otherwise acquired jobs. */ static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) { assert(ctx!=NULL); AIO_IOPool_join(ctx); assert(ctx->availableJobsCount == ctx->totalIoJobs); ctx->file = file; } static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) { return ctx->file; } /* AIO_IOPool_enqueueJob: * Enqueues an io job for execution. * The queued job shouldn't be used directly after queueing it. */ static void AIO_IOPool_enqueueJob(IOJob_t* job) { IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx; if(AIO_IOPool_threadPoolActive(ctx)) POOL_add(ctx->threadPool, ctx->poolFunction, job); else ctx->poolFunction(job); } /* *********************************** * WritePool implementation *************************************/ /* AIO_WritePool_acquireJob: * Returns an available write job to be used for a future write. */ IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* ctx) { return AIO_IOPool_acquireJob(&ctx->base); } /* AIO_WritePool_enqueueAndReacquireWriteJob: * Queues a write job for execution and acquires a new one. * After execution `job`'s pointed value would change to the newly acquired job. * Make sure to set `usedBufferSize` to the wanted length before call. * The queued job shouldn't be used directly after queueing it. */ void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) { AIO_IOPool_enqueueJob(*job); *job = AIO_IOPool_acquireJob((IOPoolCtx_t *)(*job)->ctx); } /* AIO_WritePool_sparseWriteEnd: * Ends sparse writes to the current file. * Blocks on completion of all current write jobs before executing. */ void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) { assert(ctx != NULL); AIO_IOPool_join(&ctx->base); AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips); ctx->storedSkips = 0; } /* AIO_WritePool_setFile: * Sets the destination file for future writes in the pool. * Requires completion of all queues write jobs and release of all otherwise acquired jobs. * Also requires ending of sparse write if a previous file was used in sparse mode. */ void AIO_WritePool_setFile(WritePoolCtx_t* ctx, FILE* file) { AIO_IOPool_setFile(&ctx->base, file); assert(ctx->storedSkips == 0); } /* AIO_WritePool_getFile: * Returns the file the writePool is currently set to write to. */ FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) { return AIO_IOPool_getFile(&ctx->base); } /* AIO_WritePool_releaseIoJob: * Releases an acquired job back to the pool. Doesn't execute the job. */ void AIO_WritePool_releaseIoJob(IOJob_t* job) { AIO_IOPool_releaseIoJob(job); } /* AIO_WritePool_closeFile: * Ends sparse write and closes the writePool's current file and sets the file to NULL. * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ int AIO_WritePool_closeFile(WritePoolCtx_t* ctx) { FILE* const dstFile = ctx->base.file; assert(dstFile!=NULL || ctx->base.prefs->testMode!=0); AIO_WritePool_sparseWriteEnd(ctx); AIO_IOPool_setFile(&ctx->base, NULL); return fclose(dstFile); } /* AIO_WritePool_executeWriteJob: * Executes a write job synchronously. Can be used as a function for a thread pool. */ static void AIO_WritePool_executeWriteJob(void* opaque){ IOJob_t* const job = (IOJob_t*) opaque; WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx; ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips); AIO_IOPool_releaseIoJob(job); } /* AIO_WritePool_create: * Allocates and sets and a new write pool including its included jobs. */ WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) { WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t)); if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize); ctx->storedSkips = 0; return ctx; } /* AIO_WritePool_free: * Frees and releases a writePool and its resources. Closes destination file if needs to. */ void AIO_WritePool_free(WritePoolCtx_t* ctx) { /* Make sure we finish all tasks and then free the resources */ if(AIO_WritePool_getFile(ctx)) AIO_WritePool_closeFile(ctx); AIO_IOPool_destroy(&ctx->base); assert(ctx->storedSkips==0); free(ctx); } /* AIO_WritePool_setAsync: * Allows (de)activating async mode, to be used when the expected overhead * of asyncio costs more than the expected gains. */ void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) { AIO_IOPool_setThreaded(&ctx->base, async); } /* *********************************** * ReadPool implementation *************************************/ static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) { int i; for(i=0; i<ctx->completedJobsCount; i++) { IOJob_t* job = (IOJob_t*) ctx->completedJobs[i]; AIO_IOPool_releaseIoJob(job); } ctx->completedJobsCount = 0; } static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) { ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx; AIO_IOPool_lockJobsMutex(&ctx->base); assert(ctx->completedJobsCount < MAX_IO_JOBS); ctx->completedJobs[ctx->completedJobsCount++] = job; if(AIO_IOPool_threadPoolActive(&ctx->base)) { ZSTD_pthread_cond_signal(&ctx->jobCompletedCond); } AIO_IOPool_unlockJobsMutex(&ctx->base); } /* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked: * Looks through the completed jobs for a job matching the waitingOnOffset and returns it, * if job wasn't found returns NULL. * IMPORTANT: assumes ioJobsMutex is locked. */ static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) { IOJob_t *job = NULL; int i; /* This implementation goes through all completed jobs and looks for the one matching the next offset. * While not strictly needed for a single threaded reader implementation (as in such a case we could expect * reads to be completed in order) this implementation was chosen as it better fits other asyncio * interfaces (such as io_uring) that do not provide promises regarding order of completion. */ for (i=0; i<ctx->completedJobsCount; i++) { job = (IOJob_t *) ctx->completedJobs[i]; if (job->offset == ctx->waitingOnOffset) { ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount]; return job; } } return NULL; } /* AIO_ReadPool_numReadsInFlight: * Returns the number of IO read jobs currently in flight. */ static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) { const int jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1); return (size_t)(ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld)); } /* AIO_ReadPool_getNextCompletedJob: * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset. * Would block. */ static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) { IOJob_t *job = NULL; AIO_IOPool_lockJobsMutex(&ctx->base); job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx); /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */ while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) { assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */ ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex); job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx); } if(job) { assert(job->offset == ctx->waitingOnOffset); ctx->waitingOnOffset += job->usedBufferSize; } AIO_IOPool_unlockJobsMutex(&ctx->base); return job; } /* AIO_ReadPool_executeReadJob: * Executes a read job synchronously. Can be used as a function for a thread pool. */ static void AIO_ReadPool_executeReadJob(void* opaque){ IOJob_t* const job = (IOJob_t*) opaque; ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx; if(ctx->reachedEof) { job->usedBufferSize = 0; AIO_ReadPool_addJobToCompleted(job); return; } job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file); if(job->usedBufferSize < job->bufferSize) { if(ferror(job->file)) { EXM_THROW(37, "Read error"); } else if(feof(job->file)) { ctx->reachedEof = 1; } else { EXM_THROW(37, "Unexpected short read"); } } AIO_ReadPool_addJobToCompleted(job); } static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) { IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base); job->offset = ctx->nextReadOffset; ctx->nextReadOffset += job->bufferSize; AIO_IOPool_enqueueJob(job); } static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) { while(ctx->base.availableJobsCount) { AIO_ReadPool_enqueueRead(ctx); } } /* AIO_ReadPool_setFile: * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. * Waits for all current enqueued tasks to complete if a previous file was set. */ void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) { assert(ctx!=NULL); AIO_IOPool_join(&ctx->base); AIO_ReadPool_releaseAllCompletedJobs(ctx); if (ctx->currentJobHeld) { AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld); ctx->currentJobHeld = NULL; } AIO_IOPool_setFile(&ctx->base, file); ctx->nextReadOffset = 0; ctx->waitingOnOffset = 0; ctx->srcBuffer = ctx->coalesceBuffer; ctx->srcBufferLoaded = 0; ctx->reachedEof = 0; if(file != NULL) AIO_ReadPool_startReading(ctx); } /* AIO_ReadPool_create: * Allocates and sets and a new readPool including its included jobs. * bufferSize should be set to the maximal buffer we want to read at a time, will also be used * as our basic read size. */ ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) { ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t)); if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize); ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2); if(!ctx->coalesceBuffer) EXM_THROW(100, "Allocation error : not enough memory"); ctx->srcBuffer = ctx->coalesceBuffer; ctx->srcBufferLoaded = 0; ctx->completedJobsCount = 0; ctx->currentJobHeld = NULL; if(ctx->base.threadPool) if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL)) EXM_THROW(103,"Failed creating jobCompletedCond cond"); return ctx; } /* AIO_ReadPool_free: * Frees and releases a readPool and its resources. Closes source file. */ void AIO_ReadPool_free(ReadPoolCtx_t* ctx) { if(AIO_ReadPool_getFile(ctx)) AIO_ReadPool_closeFile(ctx); if(ctx->base.threadPool) ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond); AIO_IOPool_destroy(&ctx->base); free(ctx->coalesceBuffer); free(ctx); } /* AIO_ReadPool_consumeBytes: * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) { assert(n <= ctx->srcBufferLoaded); ctx->srcBufferLoaded -= n; ctx->srcBuffer += n; } /* AIO_ReadPool_releaseCurrentlyHeldAndGetNext: * Release the current held job and get the next one, returns NULL if no next job available. */ static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) { if (ctx->currentJobHeld) { AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld); ctx->currentJobHeld = NULL; AIO_ReadPool_enqueueRead(ctx); } ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx); return (IOJob_t*) ctx->currentJobHeld; } /* AIO_ReadPool_fillBuffer: * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller). * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file. * Return value is the number of bytes added to the buffer. * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */ size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) { IOJob_t *job; int useCoalesce = 0; if(n > ctx->base.jobBufferSize) n = ctx->base.jobBufferSize; /* We are good, don't read anything */ if (ctx->srcBufferLoaded >= n) return 0; /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job * and coalesce the remaining bytes with the next job's buffer */ if (ctx->srcBufferLoaded > 0) { useCoalesce = 1; memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded); ctx->srcBuffer = ctx->coalesceBuffer; } /* Read the next chunk */ job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx); if(!job) return 0; if(useCoalesce) { assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize); memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize); ctx->srcBufferLoaded += job->usedBufferSize; } else { ctx->srcBuffer = (U8 *) job->buffer; ctx->srcBufferLoaded = job->usedBufferSize; } return job->usedBufferSize; } /* AIO_ReadPool_consumeAndRefill: * Consumes the current buffer and refills it with bufferSize bytes. */ size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) { AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded); return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize); } /* AIO_ReadPool_getFile: * Returns the current file set for the read pool. */ FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) { return AIO_IOPool_getFile(&ctx->base); } /* AIO_ReadPool_closeFile: * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) { FILE* const file = AIO_ReadPool_getFile(ctx); AIO_ReadPool_setFile(ctx, NULL); return fclose(file); } /* AIO_ReadPool_setAsync: * Allows (de)activating async mode, to be used when the expected overhead * of asyncio costs more than the expected gains. */ void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) { AIO_IOPool_setThreaded(&ctx->base, async); }