Orion
Barry Adding pipes e59e4fe (3 years, 3 months ago)
/*
* This file implements the VFS pipe mechanism. It supports both named and
* anonymous pipes (FIFOs and pipes), and uses the same underlying operations
* for both. It uses a ring buffer to implement the reading/writing.
*/
#include <stdint.h>
#include <stddef.h>
#include <string.h>
#include <errno.h>
#include "vfs.h"
#include "cache.h"
#include "inode.h"
#include "../mem/heap.h"
#include "../mem/paging.h"
#include "../mem/frame.h"
#include "../task/task.h"
/* Structure for pipe data */
typedef struct Pipe {
size_t size;
off_t head, tail;
size_t readers, writers;
TaskQueue readQueue, writeQueue;
} Pipe;
size_t pipe_read(File *file, char *buf, size_t size, off_t offset);
size_t pipe_write(File *file, char *buf, size_t size, off_t offset);
int pipe_release(File *file);
FileOps pipeWriteFileOps = {
.write = pipe_write,
.release = pipe_release,
};
FileOps pipeReadFileOps = {
.read = pipe_read,
.release = pipe_release,
};
/* Free a Pipe */
static void
pipe_free_extra(Inode *inode)
{
Pipe *pipe = inode->extra;
/* Resume waiting tasks */
Task *task;
while ((task = pop_from_queue(&pipe->readQueue)) != NULL) {
unblock_task(task);
send_sig(task, SIGPIPE);
}
while ((task = pop_from_queue(&pipe->writeQueue)) != NULL) {
unblock_task(task);
send_sig(task, SIGPIPE);
}
}
/* Open an anonymous pipe */
int
pipe(int pipefd[2])
{
if (!verify_access(pipefd, sizeof(int) * 2, PROT_WRITE))
return -EFAULT;
/* Allocate file descriptors */
int freefd = 0, i;
for (i = 0; i < NFILES; i++)
freefd += !current->files->fd[i];
if (freefd < 2)
return -EMFILE;
/* Create endpoints */
File *file[2];
Pipe *pipe = kmalloc(sizeof(Pipe));
Inode *inode = kmalloc(sizeof(Inode));
for (i = 0; i < 2; i++) {
file[i] = kmalloc(sizeof(File));
/* Install file descriptors */
for (pipefd[i] = 0; pipefd[i] < NFILES; pipefd[i]++)
if (!current->files->fd[pipefd[i]])
break;
current->files->fd[pipefd[i]] = file[i];
/* Build files */
file[i]->inode = inode_get(inode);
file[i]->mode = S_IFIFO | 0666;
file[i]->uid = current->uid;
file[i]->gid = current->gid;
file[i]->usage = 1;
}
file[0]->ops = &pipeReadFileOps;
file[1]->ops = &pipeWriteFileOps;
inode->extra = pipe;
pipe->readers = pipe->writers = 1;
page_create(inode, alloc_frames(1), 0);
}
/* Read from a pipe */
size_t
pipe_read(File *file, char *buf, size_t size, off_t offset)
{
offset %= 4096;
Pipe *pipe = file->inode->extra;
if (!pipe->writers) {
send_sig(current, SIGPIPE);
return 0;
}
Task *tmp;
Page *page = page_find(file->inode, 0);
acquire(&quickPageLock);
page_t oldPage = quick_page(page->frame);
size_t count = 0, avail, min, minPart;
while (size) {
avail = pipe->size;
if (!avail) {
quick_page(oldPage);
release(&quickPageLock);
if (!pipe->writers)
goto end;
while (tmp = pop_from_queue(&pipe->writeQueue))
unblock_task(tmp);
add_to_queue(&pipe->readQueue, current);
block_task(WAITING_FOR_IO);
acquire(&quickPageLock);
oldPage = quick_page(page->frame);
}
avail = pipe->size;
min = (size < avail) ? size : avail;
if (pipe->tail + min >= 4096) {
minPart = 4096 - pipe->tail;
memcpy(buf + count, QUICK_PAGE + pipe->tail, minPart);
memcpy(buf + count + minPart, QUICK_PAGE,
min - minPart);
} else {
memcpy(buf + count, QUICK_PAGE + pipe->tail, min);
}
size -= min;
count += min;
pipe->tail += min;
pipe->tail %= 4096;
pipe->size -= min;
}
quick_page(oldPage);
release(&quickPageLock);
end:
while (tmp = pop_from_queue(&pipe->writeQueue))
unblock_task(tmp);
return count;
}
/* Write to a pipe */
size_t
pipe_write(File *file, char *buf, size_t size, off_t offset)
{
offset %= 4096;
Pipe *pipe = file->inode->extra;
if (!pipe->readers) {
send_sig(current, SIGPIPE);
return 0;
}
Task *tmp;
Page *page = page_find(file->inode, 0);
acquire(&quickPageLock);
page_t oldPage = quick_page(page->frame);
size_t count = 0, space, min, minPart;
while (size) {
space = 4096 - pipe->size;
if (!space) {
quick_page(oldPage);
release(&quickPageLock);
while (tmp = pop_from_queue(&pipe->readQueue))
unblock_task(tmp);
add_to_queue(&pipe->writeQueue, current);
block_task(WAITING_FOR_IO);
acquire(&quickPageLock);
oldPage = quick_page(page->frame);
}
space = 4096 - pipe->size;
min = (size < space) ? size : space;
if (pipe->head + min >= 4096) {
minPart = 4096 - pipe->head;
memcpy(QUICK_PAGE + pipe->head, buf + count, minPart);
memcpy(QUICK_PAGE, buf + count + minPart,
min - minPart);
} else {
memcpy(QUICK_PAGE + pipe->head, buf + count, min);
}
size -= min;
count += min;
pipe->head += min;
pipe->head %= 4096;
pipe->size += min;
}
quick_page(oldPage);
release(&quickPageLock);
while (tmp = pop_from_queue(&pipe->readQueue))
unblock_task(tmp);
return count;
}
int
pipe_release(File *file)
{
Task *tmp;
Pipe *pipe = file->inode->extra;
TaskQueue *queue = (file->ops == &pipeReadFileOps)
? &pipe->writeQueue : &pipe->readQueue;
while (tmp = pop_from_queue(queue)) {
unblock_task(tmp);
send_sig(tmp, SIGPIPE);
}
if (queue == &pipe->readQueue)
pipe->writers--;
else
pipe->readers--;
}