BarryServer : Git

All the code for all my projects
// BarryServer : Git / Orion / blob / e59e4fe0bbf5a3f56db0700ee49a81131b590f9c / vfs / pipe.c

// Related

Orion

Barry Adding pipes e59e4fe (2 years, 4 months ago)
/*
 * This file implements the VFS pipe mechanism.  It supports both named and
 * anonymous pipes (FIFOs and pipes), and uses the same underlying operations
 * for both.  It uses a ring buffer to implement the reading/writing.
 */

#include <stdint.h>
#include <stddef.h>
#include <string.h>
#include <errno.h>
#include "vfs.h"
#include "cache.h"
#include "inode.h"
#include "../mem/heap.h"
#include "../mem/paging.h"
#include "../mem/frame.h"
#include "../task/task.h"

/* Structure for pipe data */
typedef struct Pipe {
	size_t size;
	off_t head, tail;
	size_t readers, writers;
	TaskQueue readQueue, writeQueue;
} Pipe;

size_t pipe_read(File *file, char *buf, size_t size, off_t offset);
size_t pipe_write(File *file, char *buf, size_t size, off_t offset);
int pipe_release(File *file);

FileOps pipeWriteFileOps = {
	.write = pipe_write,
	.release = pipe_release,
};
FileOps pipeReadFileOps = {
	.read = pipe_read,
	.release = pipe_release,
};

/* Free a Pipe */
static void
pipe_free_extra(Inode *inode)
{
	Pipe *pipe = inode->extra;

	/* Resume waiting tasks */
	Task *task;
	while ((task = pop_from_queue(&pipe->readQueue)) != NULL) {
		unblock_task(task);
		send_sig(task, SIGPIPE);
	}
	while ((task = pop_from_queue(&pipe->writeQueue)) != NULL) {
		unblock_task(task);
		send_sig(task, SIGPIPE);
	}
}

/* Open an anonymous pipe */
int
pipe(int pipefd[2])
{
	if (!verify_access(pipefd, sizeof(int) * 2, PROT_WRITE))
		return -EFAULT;

	/* Allocate file descriptors */
	int freefd = 0, i;
	for (i = 0; i < NFILES; i++)
		freefd += !current->files->fd[i];
	if (freefd < 2)
		return -EMFILE;

	/* Create endpoints */
	File *file[2];
	Pipe *pipe = kmalloc(sizeof(Pipe));
	Inode *inode = kmalloc(sizeof(Inode));
	for (i = 0; i < 2; i++) {
		file[i] = kmalloc(sizeof(File));

		/* Install file descriptors */
		for (pipefd[i] = 0; pipefd[i] < NFILES; pipefd[i]++)
			if (!current->files->fd[pipefd[i]])
				break;
		current->files->fd[pipefd[i]] = file[i];

		/* Build files */
		file[i]->inode = inode_get(inode);
		file[i]->mode = S_IFIFO | 0666;
		file[i]->uid = current->uid;
		file[i]->gid = current->gid;
		file[i]->usage = 1;
	}
	file[0]->ops = &pipeReadFileOps;
	file[1]->ops = &pipeWriteFileOps;
	inode->extra = pipe;
	pipe->readers = pipe->writers = 1;
	page_create(inode, alloc_frames(1), 0);
}

/* Read from a pipe */
size_t
pipe_read(File *file, char *buf, size_t size, off_t offset)
{
	offset %= 4096;

	Pipe *pipe = file->inode->extra;
	if (!pipe->writers) {
		send_sig(current, SIGPIPE);
		return 0;
	}

	Task *tmp;
	Page *page = page_find(file->inode, 0);
	acquire(&quickPageLock);
	page_t oldPage = quick_page(page->frame);
	size_t count = 0, avail, min, minPart;
	while (size) {
		avail = pipe->size;
		if (!avail) {
			quick_page(oldPage);
			release(&quickPageLock);

			if (!pipe->writers)
				goto end;

			while (tmp = pop_from_queue(&pipe->writeQueue))
				unblock_task(tmp);
			add_to_queue(&pipe->readQueue, current);
			block_task(WAITING_FOR_IO);

			acquire(&quickPageLock);
			oldPage = quick_page(page->frame);
		}
		avail = pipe->size;
		min = (size < avail) ? size : avail;
		if (pipe->tail + min >= 4096) {
			minPart = 4096 - pipe->tail;
			memcpy(buf + count, QUICK_PAGE + pipe->tail, minPart);
			memcpy(buf + count + minPart, QUICK_PAGE,
			       min - minPart);
		} else {
			memcpy(buf + count, QUICK_PAGE + pipe->tail, min);
		}

		size -= min;
		count += min;
		pipe->tail += min;
		pipe->tail %= 4096;
		pipe->size -= min;
	}

	quick_page(oldPage);
	release(&quickPageLock);

end:
	while (tmp = pop_from_queue(&pipe->writeQueue))
		unblock_task(tmp);

	return count;
}

/* Write to a pipe */
size_t
pipe_write(File *file, char *buf, size_t size, off_t offset)
{
	offset %= 4096;

	Pipe *pipe = file->inode->extra;
	if (!pipe->readers) {
		send_sig(current, SIGPIPE);
		return 0;
	}

	Task *tmp;
	Page *page = page_find(file->inode, 0);
	acquire(&quickPageLock);
	page_t oldPage = quick_page(page->frame);
	size_t count = 0, space, min, minPart;
	while (size) {
		space = 4096 - pipe->size;
		if (!space) {
			quick_page(oldPage);
			release(&quickPageLock);

			while (tmp = pop_from_queue(&pipe->readQueue))
				unblock_task(tmp);
			add_to_queue(&pipe->writeQueue, current);
			block_task(WAITING_FOR_IO);

			acquire(&quickPageLock);
			oldPage = quick_page(page->frame);
		}
		space = 4096 - pipe->size;
		min = (size < space) ? size : space;
		if (pipe->head + min >= 4096) {
			minPart = 4096 - pipe->head;
			memcpy(QUICK_PAGE + pipe->head, buf + count, minPart);
			memcpy(QUICK_PAGE, buf + count + minPart,
			       min - minPart);
		} else {
			memcpy(QUICK_PAGE + pipe->head, buf + count, min);
		}

		size -= min;
		count += min;
		pipe->head += min;
		pipe->head %= 4096;
		pipe->size += min;
	}

	quick_page(oldPage);
	release(&quickPageLock);

	while (tmp = pop_from_queue(&pipe->readQueue))
		unblock_task(tmp);

	return count;
}

int
pipe_release(File *file)
{
	Task *tmp;
	Pipe *pipe = file->inode->extra;
	TaskQueue *queue = (file->ops == &pipeReadFileOps)
	                 ? &pipe->writeQueue : &pipe->readQueue;
	while (tmp = pop_from_queue(queue)) {
		unblock_task(tmp);
		send_sig(tmp, SIGPIPE);
	}
	if (queue == &pipe->readQueue)
		pipe->writers--;
	else
		pipe->readers--;
}