/* * libhdfs engine * * this engine helps perform read/write operations on hdfs cluster using * libhdfs. hdfs doesnot support modification of data once file is created. * * so to mimic that create many files of small size (e.g 256k), and this * engine select a file based on the offset generated by fio. * * thus, random reads and writes can also be achieved with this logic. * */ #include #include #include "../fio.h" #include "../optgroup.h" #define CHUNCK_NAME_LENGTH_MAX 80 #define CHUNCK_CREATION_BUFFER_SIZE 65536 struct hdfsio_data { hdfsFS fs; hdfsFile fp; uint64_t curr_file_id; }; struct hdfsio_options { void *pad; /* needed because offset can't be 0 for a option defined used offsetof */ char *host; char *directory; unsigned int port; unsigned int chunck_size; unsigned int single_instance; unsigned int use_direct; }; static struct fio_option options[] = { { .name = "namenode", .lname = "hfds namenode", .type = FIO_OPT_STR_STORE, .off1 = offsetof(struct hdfsio_options, host), .def = "localhost", .help = "Namenode of the HDFS cluster", .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_HDFS, }, { .name = "hostname", .lname = "hfds namenode", .type = FIO_OPT_STR_STORE, .off1 = offsetof(struct hdfsio_options, host), .def = "localhost", .help = "Namenode of the HDFS cluster", .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_HDFS, }, { .name = "port", .lname = "hdfs namenode port", .type = FIO_OPT_INT, .off1 = offsetof(struct hdfsio_options, port), .def = "9000", .minval = 1, .maxval = 65535, .help = "Port used by the HDFS cluster namenode", .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_HDFS, }, { .name = "hdfsdirectory", .lname = "hfds directory", .type = FIO_OPT_STR_STORE, .off1 = offsetof(struct hdfsio_options, directory), .def = "/", .help = "The HDFS directory where fio will create chuncks", .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_HDFS, }, { .name = "chunk_size", .alias = "chunck_size", .lname = "Chunk size", .type = FIO_OPT_INT, .off1 = offsetof(struct hdfsio_options, chunck_size), .def = "1048576", .help = "Size of individual chunck", .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_HDFS, }, { .name = "single_instance", .lname = "Single Instance", .type = FIO_OPT_BOOL, .off1 = offsetof(struct hdfsio_options, single_instance), .def = "1", .help = "Use a single instance", .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_HDFS, }, { .name = "hdfs_use_direct", .lname = "HDFS Use Direct", .type = FIO_OPT_BOOL, .off1 = offsetof(struct hdfsio_options, use_direct), .def = "0", .help = "Use readDirect instead of hdfsRead", .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_HDFS, }, { .name = NULL, }, }; static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) { return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id); } static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u) { struct hdfsio_options *options = td->eo; struct hdfsio_data *hd = td->io_ops_data; unsigned long f_id; char fname[CHUNCK_NAME_LENGTH_MAX]; int open_flags; /* find out file id based on the offset generated by fio */ f_id = floor(io_u->offset / options-> chunck_size); if (f_id == hd->curr_file_id) { /* file is already open */ return 0; } if (hd->curr_file_id != -1) { if ( hdfsCloseFile(hd->fs, hd->fp) == -1) { log_err("hdfs: unable to close file: %s\n", strerror(errno)); return errno; } hd->curr_file_id = -1; } if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) { open_flags = O_RDONLY; } else if (io_u->ddir == DDIR_WRITE) { open_flags = O_WRONLY; } else { log_err("hdfs: Invalid I/O Operation\n"); return 0; } get_chunck_name(fname, io_u->file->file_name, f_id); hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0, options->chunck_size); if(hd->fp == NULL) { log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno)); return errno; } hd->curr_file_id = f_id; return 0; } static enum fio_q_status fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u) { struct hdfsio_data *hd = td->io_ops_data; struct hdfsio_options *options = td->eo; int ret; unsigned long offset; offset = io_u->offset % options->chunck_size; if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) && hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) { log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno)); io_u->error = errno; return FIO_Q_COMPLETED; }; // do the IO if (io_u->ddir == DDIR_READ) { if (options->use_direct) { ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); } else { ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); } } else if (io_u->ddir == DDIR_WRITE) { ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); } else if (io_u->ddir == DDIR_SYNC) { ret = hdfsFlush(hd->fs, hd->fp); } else { log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir); ret = EINVAL; } // Check if the IO went fine, or is incomplete if (ret != (int)io_u->xfer_buflen) { if (ret >= 0) { io_u->resid = io_u->xfer_buflen - ret; io_u->error = 0; return FIO_Q_COMPLETED; } else { io_u->error = errno; } } if (io_u->error) td_verror(td, io_u->error, "xfer"); return FIO_Q_COMPLETED; } int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f) { if (td->o.odirect) { td->error = EINVAL; return 0; } return 0; } int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f) { struct hdfsio_data *hd = td->io_ops_data; if (hd->curr_file_id != -1) { if ( hdfsCloseFile(hd->fs, hd->fp) == -1) { log_err("hdfs: unable to close file: %s\n", strerror(errno)); return errno; } hd->curr_file_id = -1; } return 0; } static int fio_hdfsio_init(struct thread_data *td) { struct hdfsio_options *options = td->eo; struct hdfsio_data *hd = td->io_ops_data; struct fio_file *f; uint64_t j,k; int i, failure = 0; uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE]; uint64_t bytes_left; char fname[CHUNCK_NAME_LENGTH_MAX]; hdfsFile fp; hdfsFileInfo *fi; tOffset fi_size; for_each_file(td, f, i) { k = 0; for(j=0; j < f->real_file_size; j += options->chunck_size) { get_chunck_name(fname, f->file_name, k++); fi = hdfsGetPathInfo(hd->fs, fname); fi_size = fi ? fi->mSize : 0; // fill exist and is big enough, nothing to do if( fi && fi_size >= options->chunck_size) { continue; } fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0, options->chunck_size); if(fp == NULL) { failure = errno; log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); break; } bytes_left = options->chunck_size; memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE); while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) { if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE) != CHUNCK_CREATION_BUFFER_SIZE) { failure = errno; log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); break; }; bytes_left -= CHUNCK_CREATION_BUFFER_SIZE; } if(bytes_left > 0) { if( hdfsWrite(hd->fs, fp, buffer, bytes_left) != bytes_left) { failure = errno; break; }; } if( hdfsCloseFile(hd->fs, fp) != 0) { failure = errno; log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); break; } } if(failure) { break; } } if( !failure ) { fio_file_set_size_known(f); } return failure; } static int fio_hdfsio_setup(struct thread_data *td) { struct hdfsio_data *hd; struct fio_file *f; int i; uint64_t file_size, total_file_size; if (!td->io_ops_data) { hd = malloc(sizeof(*hd)); memset(hd, 0, sizeof(*hd)); hd->curr_file_id = -1; td->io_ops_data = hd; } total_file_size = 0; file_size = 0; for_each_file(td, f, i) { if(!td->o.file_size_low) { file_size = floor(td->o.size / td->o.nr_files); total_file_size += file_size; } else if (td->o.file_size_low == td->o.file_size_high) file_size = td->o.file_size_low; else { file_size = get_rand_file_size(td); } f->real_file_size = file_size; } /* If the size doesn't divide nicely with the chunck size, * make the last files bigger. * Used only if filesize was not explicitely given */ if (!td->o.file_size_low && total_file_size < td->o.size) { f->real_file_size += (td->o.size - total_file_size); } return 0; } static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u) { struct hdfsio_data *hd = td->io_ops_data; struct hdfsio_options *options = td->eo; int failure; struct hdfsBuilder *bld; if (options->host == NULL || options->port == 0) { log_err("hdfs: server not defined\n"); return EINVAL; } bld = hdfsNewBuilder(); if (!bld) { failure = errno; log_err("hdfs: unable to allocate connect builder\n"); return failure; } hdfsBuilderSetNameNode(bld, options->host); hdfsBuilderSetNameNodePort(bld, options->port); if(! options->single_instance) { hdfsBuilderSetForceNewInstance(bld); } hd->fs = hdfsBuilderConnect(bld); /* hdfsSetWorkingDirectory succeed on non existend directory */ if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) { failure = errno; log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno)); return failure; } return 0; } static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u) { struct hdfsio_data *hd = td->io_ops_data; if (hd->fs && hdfsDisconnect(hd->fs) < 0) { log_err("hdfs: disconnect failed: %d\n", errno); } } static struct ioengine_ops ioengine_hdfs = { .name = "libhdfs", .version = FIO_IOOPS_VERSION, .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL, .setup = fio_hdfsio_setup, .init = fio_hdfsio_init, .prep = fio_hdfsio_prep, .queue = fio_hdfsio_queue, .open_file = fio_hdfsio_open_file, .close_file = fio_hdfsio_close_file, .io_u_init = fio_hdfsio_io_u_init, .io_u_free = fio_hdfsio_io_u_free, .option_struct_size = sizeof(struct hdfsio_options), .options = options, }; static void fio_init fio_hdfsio_register(void) { register_ioengine(&ioengine_hdfs); } static void fio_exit fio_hdfsio_unregister(void) { unregister_ioengine(&ioengine_hdfs); }