Orion
Barry Adding pipes e59e4fe (2 years, 4 months ago)/* * This file implements the VFS pipe mechanism. It supports both named and * anonymous pipes (FIFOs and pipes), and uses the same underlying operations * for both. It uses a ring buffer to implement the reading/writing. */ #include <stdint.h> #include <stddef.h> #include <string.h> #include <errno.h> #include "vfs.h" #include "cache.h" #include "inode.h" #include "../mem/heap.h" #include "../mem/paging.h" #include "../mem/frame.h" #include "../task/task.h" /* Structure for pipe data */ typedef struct Pipe { size_t size; off_t head, tail; size_t readers, writers; TaskQueue readQueue, writeQueue; } Pipe; size_t pipe_read(File *file, char *buf, size_t size, off_t offset); size_t pipe_write(File *file, char *buf, size_t size, off_t offset); int pipe_release(File *file); FileOps pipeWriteFileOps = { .write = pipe_write, .release = pipe_release, }; FileOps pipeReadFileOps = { .read = pipe_read, .release = pipe_release, }; /* Free a Pipe */ static void pipe_free_extra(Inode *inode) { Pipe *pipe = inode->extra; /* Resume waiting tasks */ Task *task; while ((task = pop_from_queue(&pipe->readQueue)) != NULL) { unblock_task(task); send_sig(task, SIGPIPE); } while ((task = pop_from_queue(&pipe->writeQueue)) != NULL) { unblock_task(task); send_sig(task, SIGPIPE); } } /* Open an anonymous pipe */ int pipe(int pipefd[2]) { if (!verify_access(pipefd, sizeof(int) * 2, PROT_WRITE)) return -EFAULT; /* Allocate file descriptors */ int freefd = 0, i; for (i = 0; i < NFILES; i++) freefd += !current->files->fd[i]; if (freefd < 2) return -EMFILE; /* Create endpoints */ File *file[2]; Pipe *pipe = kmalloc(sizeof(Pipe)); Inode *inode = kmalloc(sizeof(Inode)); for (i = 0; i < 2; i++) { file[i] = kmalloc(sizeof(File)); /* Install file descriptors */ for (pipefd[i] = 0; pipefd[i] < NFILES; pipefd[i]++) if (!current->files->fd[pipefd[i]]) break; current->files->fd[pipefd[i]] = file[i]; /* Build files */ file[i]->inode = inode_get(inode); file[i]->mode = S_IFIFO | 0666; file[i]->uid = current->uid; file[i]->gid = current->gid; file[i]->usage = 1; } file[0]->ops = &pipeReadFileOps; file[1]->ops = &pipeWriteFileOps; inode->extra = pipe; pipe->readers = pipe->writers = 1; page_create(inode, alloc_frames(1), 0); } /* Read from a pipe */ size_t pipe_read(File *file, char *buf, size_t size, off_t offset) { offset %= 4096; Pipe *pipe = file->inode->extra; if (!pipe->writers) { send_sig(current, SIGPIPE); return 0; } Task *tmp; Page *page = page_find(file->inode, 0); acquire(&quickPageLock); page_t oldPage = quick_page(page->frame); size_t count = 0, avail, min, minPart; while (size) { avail = pipe->size; if (!avail) { quick_page(oldPage); release(&quickPageLock); if (!pipe->writers) goto end; while (tmp = pop_from_queue(&pipe->writeQueue)) unblock_task(tmp); add_to_queue(&pipe->readQueue, current); block_task(WAITING_FOR_IO); acquire(&quickPageLock); oldPage = quick_page(page->frame); } avail = pipe->size; min = (size < avail) ? size : avail; if (pipe->tail + min >= 4096) { minPart = 4096 - pipe->tail; memcpy(buf + count, QUICK_PAGE + pipe->tail, minPart); memcpy(buf + count + minPart, QUICK_PAGE, min - minPart); } else { memcpy(buf + count, QUICK_PAGE + pipe->tail, min); } size -= min; count += min; pipe->tail += min; pipe->tail %= 4096; pipe->size -= min; } quick_page(oldPage); release(&quickPageLock); end: while (tmp = pop_from_queue(&pipe->writeQueue)) unblock_task(tmp); return count; } /* Write to a pipe */ size_t pipe_write(File *file, char *buf, size_t size, off_t offset) { offset %= 4096; Pipe *pipe = file->inode->extra; if (!pipe->readers) { send_sig(current, SIGPIPE); return 0; } Task *tmp; Page *page = page_find(file->inode, 0); acquire(&quickPageLock); page_t oldPage = quick_page(page->frame); size_t count = 0, space, min, minPart; while (size) { space = 4096 - pipe->size; if (!space) { quick_page(oldPage); release(&quickPageLock); while (tmp = pop_from_queue(&pipe->readQueue)) unblock_task(tmp); add_to_queue(&pipe->writeQueue, current); block_task(WAITING_FOR_IO); acquire(&quickPageLock); oldPage = quick_page(page->frame); } space = 4096 - pipe->size; min = (size < space) ? size : space; if (pipe->head + min >= 4096) { minPart = 4096 - pipe->head; memcpy(QUICK_PAGE + pipe->head, buf + count, minPart); memcpy(QUICK_PAGE, buf + count + minPart, min - minPart); } else { memcpy(QUICK_PAGE + pipe->head, buf + count, min); } size -= min; count += min; pipe->head += min; pipe->head %= 4096; pipe->size += min; } quick_page(oldPage); release(&quickPageLock); while (tmp = pop_from_queue(&pipe->readQueue)) unblock_task(tmp); return count; } int pipe_release(File *file) { Task *tmp; Pipe *pipe = file->inode->extra; TaskQueue *queue = (file->ops == &pipeReadFileOps) ? &pipe->writeQueue : &pipe->readQueue; while (tmp = pop_from_queue(queue)) { unblock_task(tmp); send_sig(tmp, SIGPIPE); } if (queue == &pipe->readQueue) pipe->writers--; else pipe->readers--; }