Orion
Barry Adding pipes e59e4fe (2 years, 4 months ago)diff --git a/vfs/pipe.c b/vfs/pipe.c new file mode 100644 index 0000000..0566597 --- /dev/null +++ b/vfs/pipe.c @@ -0,0 +1,234 @@ +/* + * This file implements the VFS pipe mechanism. It supports both named and + * anonymous pipes (FIFOs and pipes), and uses the same underlying operations + * for both. It uses a ring buffer to implement the reading/writing. + */ + +#include <stdint.h> +#include <stddef.h> +#include <string.h> +#include <errno.h> +#include "vfs.h" +#include "cache.h" +#include "inode.h" +#include "../mem/heap.h" +#include "../mem/paging.h" +#include "../mem/frame.h" +#include "../task/task.h" + +/* Structure for pipe data */ +typedef struct Pipe { + size_t size; + off_t head, tail; + size_t readers, writers; + TaskQueue readQueue, writeQueue; +} Pipe; + +size_t pipe_read(File *file, char *buf, size_t size, off_t offset); +size_t pipe_write(File *file, char *buf, size_t size, off_t offset); +int pipe_release(File *file); + +FileOps pipeWriteFileOps = { + .write = pipe_write, + .release = pipe_release, +}; +FileOps pipeReadFileOps = { + .read = pipe_read, + .release = pipe_release, +}; + +/* Free a Pipe */ +static void +pipe_free_extra(Inode *inode) +{ + Pipe *pipe = inode->extra; + + /* Resume waiting tasks */ + Task *task; + while ((task = pop_from_queue(&pipe->readQueue)) != NULL) { + unblock_task(task); + send_sig(task, SIGPIPE); + } + while ((task = pop_from_queue(&pipe->writeQueue)) != NULL) { + unblock_task(task); + send_sig(task, SIGPIPE); + } +} + +/* Open an anonymous pipe */ +int +pipe(int pipefd[2]) +{ + if (!verify_access(pipefd, sizeof(int) * 2, PROT_WRITE)) + return -EFAULT; + + /* Allocate file descriptors */ + int freefd = 0, i; + for (i = 0; i < NFILES; i++) + freefd += !current->files->fd[i]; + if (freefd < 2) + return -EMFILE; + + /* Create endpoints */ + File *file[2]; + Pipe *pipe = kmalloc(sizeof(Pipe)); + Inode *inode = kmalloc(sizeof(Inode)); + for (i = 0; i < 2; i++) { + file[i] = kmalloc(sizeof(File)); + + /* Install file descriptors */ + for (pipefd[i] = 0; pipefd[i] < NFILES; pipefd[i]++) + if (!current->files->fd[pipefd[i]]) + break; + current->files->fd[pipefd[i]] = file[i]; + + /* Build files */ + file[i]->inode = inode_get(inode); + file[i]->mode = S_IFIFO | 0666; + file[i]->uid = current->uid; + file[i]->gid = current->gid; + file[i]->usage = 1; + } + file[0]->ops = &pipeReadFileOps; + file[1]->ops = &pipeWriteFileOps; + inode->extra = pipe; + pipe->readers = pipe->writers = 1; + page_create(inode, alloc_frames(1), 0); +} + +/* Read from a pipe */ +size_t +pipe_read(File *file, char *buf, size_t size, off_t offset) +{ + offset %= 4096; + + Pipe *pipe = file->inode->extra; + if (!pipe->writers) { + send_sig(current, SIGPIPE); + return 0; + } + + Task *tmp; + Page *page = page_find(file->inode, 0); + acquire(&quickPageLock); + page_t oldPage = quick_page(page->frame); + size_t count = 0, avail, min, minPart; + while (size) { + avail = pipe->size; + if (!avail) { + quick_page(oldPage); + release(&quickPageLock); + + if (!pipe->writers) + goto end; + + while (tmp = pop_from_queue(&pipe->writeQueue)) + unblock_task(tmp); + add_to_queue(&pipe->readQueue, current); + block_task(WAITING_FOR_IO); + + acquire(&quickPageLock); + oldPage = quick_page(page->frame); + } + avail = pipe->size; + min = (size < avail) ? size : avail; + if (pipe->tail + min >= 4096) { + minPart = 4096 - pipe->tail; + memcpy(buf + count, QUICK_PAGE + pipe->tail, minPart); + memcpy(buf + count + minPart, QUICK_PAGE, + min - minPart); + } else { + memcpy(buf + count, QUICK_PAGE + pipe->tail, min); + } + + size -= min; + count += min; + pipe->tail += min; + pipe->tail %= 4096; + pipe->size -= min; + } + + quick_page(oldPage); + release(&quickPageLock); + +end: + while (tmp = pop_from_queue(&pipe->writeQueue)) + unblock_task(tmp); + + return count; +} + +/* Write to a pipe */ +size_t +pipe_write(File *file, char *buf, size_t size, off_t offset) +{ + offset %= 4096; + + Pipe *pipe = file->inode->extra; + if (!pipe->readers) { + send_sig(current, SIGPIPE); + return 0; + } + + Task *tmp; + Page *page = page_find(file->inode, 0); + acquire(&quickPageLock); + page_t oldPage = quick_page(page->frame); + size_t count = 0, space, min, minPart; + while (size) { + space = 4096 - pipe->size; + if (!space) { + quick_page(oldPage); + release(&quickPageLock); + + while (tmp = pop_from_queue(&pipe->readQueue)) + unblock_task(tmp); + add_to_queue(&pipe->writeQueue, current); + block_task(WAITING_FOR_IO); + + acquire(&quickPageLock); + oldPage = quick_page(page->frame); + } + space = 4096 - pipe->size; + min = (size < space) ? size : space; + if (pipe->head + min >= 4096) { + minPart = 4096 - pipe->head; + memcpy(QUICK_PAGE + pipe->head, buf + count, minPart); + memcpy(QUICK_PAGE, buf + count + minPart, + min - minPart); + } else { + memcpy(QUICK_PAGE + pipe->head, buf + count, min); + } + + size -= min; + count += min; + pipe->head += min; + pipe->head %= 4096; + pipe->size += min; + } + + quick_page(oldPage); + release(&quickPageLock); + + while (tmp = pop_from_queue(&pipe->readQueue)) + unblock_task(tmp); + + return count; +} + +int +pipe_release(File *file) +{ + Task *tmp; + Pipe *pipe = file->inode->extra; + TaskQueue *queue = (file->ops == &pipeReadFileOps) + ? &pipe->writeQueue : &pipe->readQueue; + while (tmp = pop_from_queue(queue)) { + unblock_task(tmp); + send_sig(tmp, SIGPIPE); + } + if (queue == &pipe->readQueue) + pipe->writers--; + else + pipe->readers--; +}