Add blob support

Change-Id: I5fd4ee2702a7c9d0818dda4a70620b1c33be90e3
diff --git a/garnet/bin/odu/BUILD.gn b/garnet/bin/odu/BUILD.gn
index d952242..76dcc1f 100644
--- a/garnet/bin/odu/BUILD.gn
+++ b/garnet/bin/odu/BUILD.gn
@@ -18,6 +18,7 @@
     "//garnet/public/rust/fuchsia-app",
     "//garnet/public/rust/fuchsia-async",
     "//garnet/public/rust/fuchsia-zircon",
+    "//garnet/public/rust/fuchsia-merkle",
     "//third_party/rust-crates/rustc_deps:failure",
     "//third_party/rust-crates/rustc_deps:serde",
     "//third_party/rust-crates/rustc_deps:serde_derive",
diff --git a/garnet/bin/odu/src/blob_io_generator.rs b/garnet/bin/odu/src/blob_io_generator.rs
new file mode 100644
index 0000000..390d6dc9
--- /dev/null
+++ b/garnet/bin/odu/src/blob_io_generator.rs
@@ -0,0 +1,121 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+//! BlobIoGenerator generates IO load for blobfs. Blobs are special in the sense
+//! that
+//! - they are write once.
+//! - the file name describes the content of the file.
+//! - writer needs to be aware of the entire content of the file.
+//! - though they allow issuing writes at different offsets,
+//!   but the size of the file should be set right after creation
+//!   and before first write is issued.
+//! - all blobs are in root directory. Subdirectories are not allowed.
+//! - they are written mostly sequentially.
+
+use {
+    crate::generator::Generator,
+    crate::sequential_io_generator::SequentialIoGenerator,
+    crate::operations::OperationType,
+    byteorder::{ByteOrder, LittleEndian},
+    std::{io::Cursor, io::Read, io::Write, mem, ops::Range},
+};
+
+enum BlobIoStages {
+    Start,
+    Open_Or_Create,
+    SetSize,
+    Read_Or_Write,
+    Close_Or_Delete,
+}
+
+pub struct BlobIoGenerator {
+    sequential_io_generator: SequentialIoGenerator,
+    stage: BlobIoStages,
+    blob_size: u64,
+}
+
+impl BlobIoGenerator {
+    pub fn new(
+        magic_number: u64,
+        process_id: u64,
+        fd_unique_id: u64,
+        generator_unique_id: u64,
+        offset_range: &Range<u64>,
+        block_size: u64,
+        max_io_size: u64,
+        align: bool,
+    ) -> BlobIoGenerator {
+        return BlobIoGenerator {
+            sequential_io_generator: SequentialIoGenerator::new(magic_number, process_id, fd_unique_id, generator_unique_id,
+            offset_range, block_size, max_io_size, align),
+            stage: BlobIoStages::Start,
+        };
+    }
+
+    fn io_size(&self) -> u64 {
+        self.sequential_io_generator.io_size()
+    }
+}
+
+fn operation_has(operations: &Ven<OperationType>, to_find: OperationType) {
+    while op in operations {
+        if op == to_find {
+            true
+        }
+    }
+    false
+}
+
+impl Generator for SequentialIoGenerator {
+    fn generate_number(&mut self) -> u64 {
+        self.sequential_io_generator.generate_number()
+        if self.stage == BlobIoStages::Start || self.stage == BlobIoStages::Close_Or_Delete {
+            self.stage = BlobIoStages::SetSize;
+        } self.stage == BlobIoStages::SetSize {
+            self.stage = BlobIoStages::Open_Or_Create;
+        } else if self.stage == BlobIoStages::Open_Or_Create {
+            self.stage = BlobIoStages::Read_Or_Write;
+        } else if self.stage == BlobIoStages::Read_Or_Write {
+            // We move to the next phase if we are done writing the blob completely
+            range = self.get_io_range();
+            if range.start == 0 {
+                self.stage = BlobIoStages::Close_Or_Delete;
+            }
+        }
+    }
+
+    fn get_io_operation(&self, allowed_ops: &Vec<OperationType>) -> OperationType {
+        if self.stage == BlobIoStages::Open_Or_Create {
+            assert_eq!(operation_has(allowed_ops, OperationType::Open), false);
+            assert_eq!(operation_has(allowed_ops, OperationType::Create), true);
+            return OperationType::Create;
+        } self.stage == BlobIoStages::SetSize {
+            return OperationType::Truncate;
+        } else if self.stage == BlobIoStages::Read_Or_Write {
+            assert_eq!(operation_has(allowed_ops, OperationType::Read), false);
+            assert_eq!(operation_has(allowed_ops, OperationType::Write), true);
+            return OperationType::Write;
+        } else if self.stage == BlobIoStages::Close_Or_Delete {
+            assert_eq!(operation_has(allowed_ops, OperationType::Delete), false);
+            assert_eq!(operation_has(allowed_ops, OperationType::Close), true);
+            return OperationType::Close;
+        } else {
+            assert!(false);
+        }
+    }
+
+    fn get_io_range(&self) -> Range<u64> {
+        if self.stage == BlobIoStages::Read_Or_Write {
+            self.sequential_io_generator.get_io_range()
+        }
+        0..self.blob_size
+    }
+
+    fn fill_buffer(&self, buf: &mut Vec<u8>, sequence_number: u64, offset_range: &Range<u64>) {
+        // We fill buffer only once for blobs
+        if self.stage == BlobIoStages::Open_Or_Create {
+            self.sequential_io_generator.fill_buffer(buf, sequence_number, offset_range);
+        }
+    }
+}
diff --git a/garnet/bin/odu/src/blob_target.rs b/garnet/bin/odu/src/blob_target.rs
new file mode 100644
index 0000000..aca24cc
--- /dev/null
+++ b/garnet/bin/odu/src/blob_target.rs
@@ -0,0 +1,395 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+//! File implements IoPacket and Target for POSIX File like objects. Current
+//! implementation limits itself to blocking calls.
+
+use {
+    crate::io_packet::{IoPacket, IoPacketType, TimeInterval},
+    crate::operations::TargetType,
+    crate::operations::{OperationType, PipelineStages, Target, TargetOps},
+    libc::{c_void, pwrite},
+    log::debug,
+    log::error,
+    std::{
+        fs::{File, OpenOptions},
+        io::{Error, ErrorKind, Result},
+        ops::Range,
+        os::unix::io::AsRawFd,
+        process,
+        sync::Arc,
+        time::Instant,
+    },
+};
+
+#[derive(Clone)]
+pub struct BlobIoPacket {
+    // io_sequence_number is monotonically increasing number which doesn't
+    // repeat for this run and for this generator. This is used to
+    // order-replay load.
+    io_sequence_number: u64,
+
+    // This value represents the seed used to generate the contents of the
+    // IoPacket.
+    seed: u64,
+
+    // Start and finish timestamps for different stages of IO.
+    stage_timestamps: [TimeInterval; PipelineStages::stage_count()],
+
+    // Type of IO operation
+    operation_type: OperationType,
+
+    // Range within the blob on which this IO will be performed on. May not
+    // applicable to all operations ex. create
+    offset_range: Range<u64>,
+
+    // Result of the completed IO operation
+    io_result: Option<ErrorKind>,
+
+    // The target(blob) on which IO will be performed
+    target: TargetType,
+
+    // Payload of the IO
+    buffer: Vec<u8>,
+}
+
+impl BlobIoPacket {
+    pub fn new(
+        operation_type: OperationType,
+        seq: u64,
+        seed: u64,
+        offset_range: &Range<u64>,
+        target: &TargetType,
+    ) -> BlobIoPacket {
+        let mut p = BlobIoPacket {
+            operation_type: operation_type,
+            io_sequence_number: seq,
+            seed: seed,
+            stage_timestamps: [TimeInterval::new(); PipelineStages::stage_count()],
+            offset_range: offset_range.clone(),
+            io_result: None,
+            target: target.clone(),
+            buffer: Vec::with_capacity(offset_range.end as usize - offset_range.start as usize),
+        };
+        p.buffer.resize(p.buffer.capacity(), 0);
+        return p;
+    }
+}
+
+impl IoPacket for BlobIoPacket {
+    fn operation_type(&self) -> OperationType {
+        self.operation_type.clone()
+    }
+
+    fn timestamp_stage_start(&mut self, stage: &PipelineStages) {
+        self.stage_timestamps[stage.stage_number()].start();
+    }
+
+    fn timestamp_stage_end(&mut self, stage: &PipelineStages) {
+        self.stage_timestamps[stage.stage_number()].end();
+    }
+
+    fn sequence_number(&self) -> &u64 {
+        &self.io_sequence_number
+    }
+
+    fn stage_duration(&self, stage: &PipelineStages) -> u128 {
+        self.stage_timestamps[stage.stage_number()].duration()
+    }
+
+    fn interval_to_u64(&self, stage: &PipelineStages) -> (u64, u64) {
+        self.stage_timestamps[stage.stage_number()].interval_to_u64(&self.target.start_instant())
+    }
+
+    fn io_offset_range(&self) -> &Range<u64> {
+        &self.offset_range
+    }
+
+    fn do_io(&mut self) {
+        let target = self.target.clone();
+        target.do_io(self);
+    }
+
+    fn is_complete(&self) -> bool {
+        let target = self.target.clone();
+        return target.is_complete(self);
+    }
+
+    fn verify_needs_io(&self) -> bool {
+        let target = self.target.clone();
+        return target.verify_needs_io(self);
+    }
+
+    fn generate_verify_io(&mut self) {
+        let target = self.target.clone();
+        return target.generate_verify_io(self);
+    }
+
+    fn verify(&mut self, verify_packet: &IoPacket) -> bool {
+        let target = self.target.clone();
+        return target.verify(self, verify_packet);
+    }
+
+    fn get_error(&self) -> Result<()> {
+        match self.io_result {
+            Some(error) => Err(Error::new(error.clone(), "something went wrong")),
+            None => Ok(()),
+        }
+    }
+
+    fn set_error(&mut self, io_error: Error) {
+        self.io_result = Some(io_error.kind());
+    }
+
+    fn buffer_mut(&mut self) -> &mut Vec<u8> {
+        return &mut self.buffer;
+    }
+
+    fn buffer(&mut self) -> &Vec<u8> {
+        return &self.buffer;
+    }
+}
+
+pub struct BlobBlockingTarget {
+    /// File name
+    target_base_directory: String,
+
+    /// Set of [supported] operations
+    ops: TargetOps,
+
+    /// Open file descriptor
+    file: Option<File>,
+
+    /// Unique file id for this run and for this instance of blob
+    target_unique_id: u64,
+
+    /// Range within which this Targets operates on the file
+    offset_range: Range<u64>,
+
+    start_instant: Instant,
+
+    _blob_data: Option<Arc<Box<[u8]>>>,
+
+    blob_name: Option<String>,
+}
+
+impl BlobBlockingTarget {
+    // Create a new Target instance. Fails when opening an existing file fails.
+    // TODO(auradkar): Open should be moved to setup phase when all operations
+    // file are supported.
+    pub fn new(
+        target_base_directory: String,
+        target_base_id: u64,
+        offset_range: &Range<u64>,
+        start_instant: &Instant,
+    ) -> Result<TargetType> {
+        let ops = TargetOps { write: Some(OperationType::Write),
+                              truncate: Some(OperationType::Truncate),
+                              open: Some(OperationType::Open),
+                              create: Some(OperationType::Create),
+                            };
+        Ok(Arc::new(Box::new(BlobBlockingTarget {
+            target_base_directory: target_base_directory,
+            ops: ops,
+            target_unique_id: target_base_id,
+            offset_range: offset_range.clone(),
+            start_instant: start_instant.clone(),
+            _blob_data: None,
+            blob_name: None,
+            file: None,
+        })))
+    }
+
+    // pwrite the buffer in IoPacket at io_offset_range.
+    fn write(&self, io_packet: &mut IoPacket) {
+        let offset_range = io_packet.io_offset_range().clone();
+
+        if offset_range.start < self.offset_range.start || offset_range.end > self.offset_range.end
+        {
+            io_packet.set_error(Error::new(ErrorKind::AddrInUse, "Offset out of range!"));
+            return;
+        }
+
+        let raw_fd = self.file.as_ref().unwrap().as_raw_fd().clone();
+        let b = io_packet.buffer_mut();
+
+        let ret = unsafe {
+            pwrite(raw_fd, b.as_ptr() as *const c_void, b.len(), offset_range.start as i64)
+        };
+        debug!(
+            "do_write: {}..{} {}",
+            io_packet.io_offset_range().start,
+            io_packet.io_offset_range().end,
+            ret
+        );
+        if ret < 0 {
+            io_packet.set_error(Error::last_os_error());
+        } else if ret < 8 {
+            io_packet.set_error(Error::new(ErrorKind::Other, "oh no!"));
+        }
+    }
+
+    fn open(&self, _io_packet: &mut IoPacket) {
+        // let mut  blob_name = self.target_base_directory.clone();
+        let blob_name = format!("{}/{}", self.target_base_directory.clone(), self.blob_name.as_ref().unwrap());
+        let _file = OpenOptions::new().write(true).append(false).open(blob_name).unwrap();
+    }
+
+    fn create(&self, _io_packet: &mut IoPacket) {
+        let _file = OpenOptions::new().append(false).create(true).open(self.blob_name.as_ref().unwrap());
+        error!("open not yet supported {}", _io_packet.sequence_number());
+        process::abort();
+    }
+
+    fn truncate(&self, _io_packet: &mut IoPacket) {
+        error!("open not yet supported {}", _io_packet.sequence_number());
+        process::abort();
+    }
+
+    fn exit(&self, io_packet: &mut IoPacket) {
+        debug!("Nothing to do for exit path {}", io_packet.sequence_number());
+    }
+}
+
+impl Target for BlobBlockingTarget {
+    fn setup(&mut self, _file_name: &String, _range: Range<u64>) -> Result<()> {
+        Ok(())
+    }
+
+    fn create_io_packet(
+        &self,
+        operation_type: OperationType,
+        seq: u64,
+        seed: u64,
+        io_offset_range: Range<u64>,
+        target: &TargetType,
+    ) -> IoPacketType {
+        return Box::new(BlobIoPacket::new(operation_type, seq, seed, &io_offset_range, target));
+    }
+
+    fn id(&self) -> u64 {
+        self.target_unique_id
+    }
+
+    fn supported_ops(&self) -> &TargetOps {
+        return &(self.ops);
+    }
+
+    fn do_io(&self, io_packet: &mut IoPacket) {
+        match io_packet.operation_type() {
+            OperationType::Write => self.write(io_packet),
+            OperationType::Truncate => self.truncate(io_packet),
+            OperationType::Open => self.open(io_packet),
+            OperationType::Create => self.create(io_packet),
+            OperationType::Exit => self.exit(io_packet),
+            _ => {
+                error!("Unsupported operation");
+                process::abort();
+            }
+        };
+    }
+
+    fn is_complete(&self, io_packet: &IoPacket) -> bool {
+        match io_packet.operation_type() {
+            OperationType::Write | OperationType::Open | OperationType::Exit => true,
+            _ => {
+                error!("Complete for unsupported operation");
+                process::abort();
+            }
+        }
+    }
+
+    fn verify_needs_io(&self, io_packet: &IoPacket) -> bool {
+        match io_packet.operation_type() {
+            OperationType::Write | OperationType::Open | OperationType::Exit => false,
+            _ => {
+                error!("verify_needs_io for unsupported operation");
+                process::abort();
+            }
+        }
+    }
+
+    fn generate_verify_io(&self, io_packet: &mut IoPacket) {
+        match io_packet.operation_type() {
+            _ => {
+                error!("generate_verify_io for unsupported operation");
+                process::abort();
+            }
+        };
+    }
+
+    fn verify(&self, io_packet: &mut IoPacket, _verify_packet: &IoPacket) -> bool {
+        match io_packet.operation_type() {
+            OperationType::Write | OperationType::Exit => true,
+            _ => {
+                error!("verify for unsupported operation");
+                process::abort();
+            }
+        }
+    }
+
+    fn start_instant(&self) -> Instant {
+        self.start_instant.clone()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use {
+        crate::blob_target::BlobBlockingTarget,
+        crate::operations::OperationType,
+        crate::operations::TargetType,
+        std::{fs, fs::File, time::Instant},
+    };
+
+    static FILE_LENGTH: u64 = 1 * 1024 * 1024; // 1 MiB
+
+    fn setup(file_name: &String) -> TargetType {
+        let f = File::create(&file_name).unwrap();
+        f.set_len(FILE_LENGTH).unwrap();
+        let start_instant: Instant = Instant::now();
+        BlobBlockingTarget::new(file_name.to_string(), 0, &(0..FILE_LENGTH), &start_instant)
+            .unwrap()
+    }
+
+    fn teardown(file_name: &String) {
+        if false {
+            fs::remove_file(file_name).unwrap();
+        }
+    }
+
+    #[test]
+    fn simple_write() {
+        let file_name = "/tmp/BlobBlockingTargetTestFile".to_string();
+
+        let target = setup(&file_name);
+        let mut io_packet = target.create_io_packet(OperationType::Write, 0, 0, 0..4096, &target);
+        let mut _buffer = io_packet.buffer_mut();
+        io_packet.do_io();
+        assert_eq!(io_packet.is_complete(), true);
+        io_packet.get_error().unwrap();
+        teardown(&file_name);
+    }
+
+    #[test]
+    fn write_failure() {
+        let file_name = "/tmp/BlobBlockingTargetTestFile2".to_string();
+
+        let target = setup(&file_name);
+
+        // Try to write beyond allowed offset range
+        let mut io_packet = target.create_io_packet(
+            OperationType::Write,
+            0,
+            0,
+            (2 * FILE_LENGTH)..(3 * FILE_LENGTH),
+            &target,
+        );
+        let mut _buffer = io_packet.buffer_mut();
+        io_packet.do_io();
+        assert_eq!(io_packet.is_complete(), true);
+        assert_eq!(io_packet.get_error().is_err(), true);
+        teardown(&file_name);
+    }
+}
diff --git a/garnet/bin/odu/src/file_target.rs b/garnet/bin/odu/src/file_target.rs
index 58955ce..ebbaab1 100644
--- a/garnet/bin/odu/src/file_target.rs
+++ b/garnet/bin/odu/src/file_target.rs
@@ -180,7 +180,11 @@
         offset_range: &Range<u64>,
         start_instant: &Instant,
     ) -> Result<TargetType> {
-        let ops = TargetOps { write: Some(OperationType::Write), open: Some(OperationType::Open) };
+        let ops = TargetOps { write: Some(OperationType::Write),
+                              truncate: None,
+                              open: Some(OperationType::Open),
+                              create: None,
+                            };
         //let file = OpenOptions::new().append(false).open(&target_name).unwrap();
         let file = OpenOptions::new().write(true).append(false).open(&target_name).unwrap();
         Ok(Arc::new(Box::new(FileBlockingTarget {
diff --git a/garnet/bin/odu/src/generator.rs b/garnet/bin/odu/src/generator.rs
index f1e525a..19b0277 100644
--- a/garnet/bin/odu/src/generator.rs
+++ b/garnet/bin/odu/src/generator.rs
@@ -7,6 +7,7 @@
 //! Generator thread accept a set of serializable arguments.
 use {
     crate::file_target::FileBlockingTarget,
+    crate::blob_target::BlobBlockingTarget,
     crate::issuer::{run_issuer, IssuerArgs},
     crate::log::Stats,
     crate::operations::{OperationType, PipelineStages, Target},
@@ -122,7 +123,7 @@
     fn get_io_range(&self) -> Range<u64>;
 
     /// Generates and fills the buf with data.
-    fn fill_buffer(&self, buf: &mut Vec<u8>, sequence_number: u64, offset_range: &Range<u64>);
+    fn fill_buffer(&self, buf: &mut Vec<u8>, sequence_number: u64, operation_id: u64, offset_range: &Range<u64>);
 }
 
 /// GeneratorArgs contains only the fields that help generator make decisions
@@ -261,13 +262,16 @@
     start_instant: &Instant,
 ) -> Arc<Box<Target + Send + Sync>> {
     // Manually check what is passed is what is supported.
-    if target_type != "target_file" {
-        error!("Target type {} not supported", target_type);
-        process::abort();
+    if target_type == "target_file" {
+            return FileBlockingTarget::new(target_name.to_string(), target_id, offset_range, start_instant)
+        .unwrap();
+    } else if target_type == "target_blob" {
+        return BlobBlockingTarget::new(target_name.to_string(), target_id, offset_range, start_instant)
+        .unwrap();
     }
 
-    FileBlockingTarget::new(target_name.to_string(), target_id, offset_range, start_instant)
-        .unwrap()
+        error!("Target type {} not supported", target_type);
+        process::abort();
 }
 
 /// The main thread for generator. generator creates a set of channels that it
@@ -361,7 +365,7 @@
         io_packet.timestamp_stage_start(&stage);
         io_sequence_number += 1;
         let io_offset_range = io_packet.io_offset_range().clone();
-        gen.fill_buffer(io_packet.buffer_mut(), io_sequence_number, &io_offset_range);
+        gen.fill_buffer(io_packet.buffer_mut(), io_sequence_number, op_type as u64, &io_offset_range);
         {
             let mut map = io_map.lock().unwrap();
             map.insert(io_packet.sequence_number().clone(), io_packet.clone());
diff --git a/garnet/bin/odu/src/issuer.rs b/garnet/bin/odu/src/issuer.rs
index 2d9582c..2a26961 100644
--- a/garnet/bin/odu/src/issuer.rs
+++ b/garnet/bin/odu/src/issuer.rs
@@ -69,7 +69,7 @@
     let mut scan_verifier = true;
 
     // True if the current command is from verifier.
-    let mut verifying_cmd;
+    let mut verifying_cmd:bool;
 
     let mut cmd_or_err;
     let mut cmd;
@@ -77,8 +77,7 @@
     // This thread/loop is not done till we hear explicitly from generator and
     // from verifier that they both are done. We keep track of who is done.
     while scan_generator || scan_verifier {
-        // true if the current command is received from verifier
-        let mut verifying_cmd = false;
+        verifying_cmd = false;
 
         // May block
         args.active_commands.decrement();
diff --git a/garnet/bin/odu/src/main.rs b/garnet/bin/odu/src/main.rs
index 3b52117..c8284b5 100644
--- a/garnet/bin/odu/src/main.rs
+++ b/garnet/bin/odu/src/main.rs
@@ -11,6 +11,7 @@
 mod operations;
 mod sequential_io_generator;
 mod verifier;
+mod blob_target;
 
 extern crate serde;
 
@@ -28,11 +29,31 @@
         thread::spawn,
         time::Instant,
     },
+    fuchsia_merkle::*,
 };
 
 // Magic number that gets written in block header
 static MAGIC_NUMBER: u64 = 0x4f6475346573742e;
 
+fn create_blob() {
+    let data = vec![0xff; 8192];
+    let mut builder = MerkleTreeBuilder::new();
+    for _i in 0..8 {
+        builder.write(&data[..]);
+    }
+
+    let root = builder.finish();
+    println!("{}", root.root());
+
+    let blob_name = String::from(format!("/blob/{}", root.root().to_string()));
+    println!("{}", blob_name);
+    let mut f = File::create(&blob_name).unwrap();
+    f.set_len(8192 * 8).unwrap();
+    for _i in 0..8 {
+        f.write(&data[..]).unwrap();
+    }
+}
+
 fn create_target(target_name: &String, target_length: u64) {
     let metadata = metadata(&target_name);
 
@@ -70,6 +91,7 @@
 }
 
 fn main() -> Result<(), Error> {
+    create_blob();
     // These are a bunch of inputs that each generator thread receives. These
     // should be received as input to the app.
     // TODO(auradkar): Implement args parsing and validation logic.
diff --git a/garnet/bin/odu/src/operations.rs b/garnet/bin/odu/src/operations.rs
index f3fc380..49a527c 100644
--- a/garnet/bin/odu/src/operations.rs
+++ b/garnet/bin/odu/src/operations.rs
@@ -28,12 +28,12 @@
     Abort,
     //    Read,
     //    LSeek,
-    //    Truncate,
+    Truncate,
     //    Close,
     //    FSync,
     //
     //    /// DirOps
-    //    Create,
+    Create,
     //    Unlink,
     //    CreateDir,
     //    DeleteDir,
@@ -126,14 +126,16 @@
 #[derive(Clone)]
 pub struct TargetOps {
     pub write: Option<OperationType>,
+    pub truncate: Option<OperationType>,
     pub open: Option<OperationType>,
+    pub create: Option<OperationType>,
     //    read: Option<OperationType>,
     //    lseek: Option<OperationType>,
     //    close: Option<OperationType>,
     //    fsync: Option<OperationType>,
     //
     //
-    //    create: Option<OperationType>,
+    
     //    unlink: Option<OperationType>,
     //    createdir: Option<OperationType>,
     //    deletedir: Option<OperationType>,
diff --git a/garnet/bin/odu/src/sequential_io_generator.rs b/garnet/bin/odu/src/sequential_io_generator.rs
index 0ec8ac0..ba560e3 100644
--- a/garnet/bin/odu/src/sequential_io_generator.rs
+++ b/garnet/bin/odu/src/sequential_io_generator.rs
@@ -106,7 +106,7 @@
     /// block.
     /// TODO(auradkar): SInce current IOs are always 4KiB aligned, this function
     /// works well. But not all unaligned cases are covered by this function.
-    fn write_headers(&self, buf: &mut Vec<u8>, sequence_number: u64, offset_range: &Range<u64>) {
+    fn write_headers(&self, buf: &mut Vec<u8>, sequence_number: u64, operation_id: u64, offset_range: &Range<u64>) {
         let start = round_up(offset_range.start as usize, self.block_size as usize);
         let end = offset_range.end as usize;
         let header_size = mem::size_of::<Header>();
@@ -119,6 +119,7 @@
                 self.fd_unique_id,
                 self.generator_unique_id,
                 sequence_number,
+                operation_id,
                 offset as u64,
                 buf.capacity() as u64,
                 self.last_number,
@@ -181,9 +182,9 @@
         (cur..end)
     }
 
-    fn fill_buffer(&self, buf: &mut Vec<u8>, sequence_number: u64, offset_range: &Range<u64>) {
+    fn fill_buffer(&self, buf: &mut Vec<u8>, sequence_number: u64, operation_id:u64, offset_range: &Range<u64>) {
         self.zero_fill(buf);
-        self.write_headers(buf, sequence_number, offset_range);
+        self.write_headers(buf, sequence_number, operation_id, offset_range);
     }
 }
 
@@ -207,9 +208,13 @@
     /// updated this block
     generator_unique_id: u64,
 
-    /// io_op_unique_id tells which io operation updated this block
+    /// io_op_unique_id tells which io operation updated this block. This is the
+    /// generated ios and need not be same as order of completion.
     io_op_unique_id: u64,
 
+    // io_op_id tells which operation was issued
+    io_op_id: u64,
+
     // file_offset is offset within the file where this data
     // should be found
     file_offset: u64,
@@ -232,6 +237,7 @@
         fd_unique_id: u64,
         generator_unique_id: u64,
         io_op_unique_id: u64,
+        io_op_id: u64,
         file_offset: u64,
         size: u64,
         seed: u64,
@@ -242,6 +248,7 @@
             fd_unique_id: fd_unique_id,
             generator_unique_id: generator_unique_id,
             io_op_unique_id: io_op_unique_id,
+            io_op_id: io_op_id,
             file_offset: file_offset,
             size: size,
             seed: seed,
@@ -276,6 +283,10 @@
         header.io_op_unique_id = LittleEndian::read_u64(&val64);
 
         cursor.read_exact(&mut val64).unwrap();
+        header.io_op_id = LittleEndian::read_u64(&val64);
+
+
+        cursor.read_exact(&mut val64).unwrap();
         header.file_offset = LittleEndian::read_u64(&val64);
 
         cursor.read_exact(&mut val64).unwrap();
@@ -310,6 +321,9 @@
         LittleEndian::write_u64(&mut val64, self.io_op_unique_id);
         cursor.write_all(&val64).unwrap();
 
+        LittleEndian::write_u64(&mut val64, self.io_op_id);
+        cursor.write_all(&val64).unwrap();
+
         LittleEndian::write_u64(&mut val64, self.file_offset);
         cursor.write_all(&val64).unwrap();
 
@@ -458,13 +472,15 @@
         let mut buf = vec![0 as u8; block_size as usize];
         let io_offset_range = block_size..2 * 4096;
         let io_op_unique_id = 10 as u64;
-        gen.fill_buffer(&mut buf, io_op_unique_id, &io_offset_range);
+        let io_op_id = 20 as u64;
+        gen.fill_buffer(&mut buf, io_op_unique_id, io_op_id, &io_offset_range);
         let header: Header = Header::read_header(&buf);
         assert_eq!(header.magic_number, MAGIC_NUMBER);
         assert_eq!(header.process_id, PROCESS_ID);
         assert_eq!(header.fd_unique_id, TARGET_ID);
         assert_eq!(header.generator_unique_id, GENERATOR_ID);
         assert_eq!(header.io_op_unique_id, io_op_unique_id);
+        assert_eq!(header.io_op_id, io_op_id);
         assert_eq!(header.file_offset, io_offset_range.start);
         assert_eq!(header.size, io_offset_range.end - io_offset_range.start);
     }