Skip to content

Commit

Permalink
More buf builders (#144)
Browse files Browse the repository at this point in the history
* bitarray bufbuilder

* adjacency list buf builder

* resolve warnings in test compilation

* removed unnecessary lifetimes

* conform to clippy expectations in ci
  • Loading branch information
matko authored Sep 15, 2023
1 parent 03fa9a4 commit f807c74
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/layer/internal/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ pub async fn open_base_triple_stream<F: 'static + FileLoad + FileStore>(
}

#[cfg(test)]
pub mod tests {
pub mod base_tests {
use super::*;
use crate::storage::memory::*;
use futures::stream::TryStreamExt;
Expand Down
4 changes: 2 additions & 2 deletions src/layer/internal/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,9 @@ pub async fn open_child_triple_stream<F: 'static + FileLoad + FileStore>(
}

#[cfg(test)]
pub mod tests {
pub mod child_tests {
use super::*;
use crate::layer::base::tests::*;
use crate::layer::base::base_tests::*;
use crate::storage::memory::*;
use futures::stream::TryStreamExt;

Expand Down
2 changes: 1 addition & 1 deletion src/layer/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ mod tests {
assert_eq!(1, layer.triple_layer_removal_count().unwrap());
}

use crate::layer::base::tests::*;
use crate::layer::base::base_tests::*;
#[tokio::test]
async fn base_layer_with_gaps_addition_count() {
let files = base_layer_files();
Expand Down
2 changes: 1 addition & 1 deletion src/layer/internal/object_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl Iterator for InternalTripleObjectIterator {
#[cfg(test)]
mod tests {
use super::*;
use crate::layer::base::tests::*;
use crate::layer::base::base_tests::*;
use crate::storage::memory::*;
use crate::storage::*;

Expand Down
4 changes: 2 additions & 2 deletions src/layer/internal/predicate_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ impl Iterator for InternalTriplePredicateIterator {

#[cfg(test)]
mod tests {
use crate::layer::base::tests::*;
use crate::layer::child::tests::*;
use crate::layer::base::base_tests::*;
use crate::layer::child::child_tests::*;
use crate::layer::*;

use std::sync::Arc;
Expand Down
4 changes: 2 additions & 2 deletions src/layer/internal/subject_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ impl Iterator for InternalTripleStackIterator {

#[cfg(test)]
mod tests {
use crate::layer::base::tests::*;
use crate::layer::child::tests::*;
use crate::layer::base::base_tests::*;
use crate::layer::child::child_tests::*;
use crate::layer::*;
use crate::structure::TdbDataType;

Expand Down
4 changes: 2 additions & 2 deletions src/layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,9 @@ impl ObjectType {
#[cfg(test)]
mod tests {
use super::*;
use crate::layer::internal::base::tests::base_layer_files;
use crate::layer::internal::base::base_tests::base_layer_files;
use crate::layer::internal::base::BaseLayer;
use crate::layer::internal::child::tests::child_layer_files;
use crate::layer::internal::child::child_tests::child_layer_files;
use crate::layer::internal::child::ChildLayer;
use crate::layer::internal::InternalLayer;
use crate::layer::simple_builder::{LayerBuilder, SimpleLayerBuilder};
Expand Down
151 changes: 151 additions & 0 deletions src/structure/adjacencylist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::io;
use std::pin::Pin;

use bytes::Bytes;
use bytes::BytesMut;

use super::bitarray::*;
use super::bitindex::*;
Expand All @@ -36,6 +37,15 @@ impl AdjacencyList {
AdjacencyList { nums, bits }
}

pub fn from_buffers(buffers: AdjacencyListBuffers) -> AdjacencyList {
Self::parse(
buffers.nums,
buffers.bits,
buffers.bitindex_blocks,
buffers.bitindex_sblocks,
)
}

pub fn parse(
nums_slice: Bytes,
bits_slice: Bytes,
Expand Down Expand Up @@ -224,6 +234,82 @@ pub async fn adjacency_list_stream_pairs<F: 'static + FileLoad>(
)
}

pub struct UnindexedAdjacencyListBufBuilder {
bitarray: BitArrayBufBuilder<BytesMut>,
nums: LogArrayBufBuilder<BytesMut>,
last_left: u64,
last_right: u64,
}

impl UnindexedAdjacencyListBufBuilder {
pub fn new(width: u8) -> Self {
Self {
bitarray: BitArrayBufBuilder::new(BytesMut::new()),
nums: LogArrayBufBuilder::new(BytesMut::new(), width),
last_left: 0,
last_right: 0,
}
}

pub fn push(&mut self, left: u64, right: u64) {
// the tricky thing with this code is that the bitarray lags one entry behind the logarray.
// The reason for this is that at push time, we do not yet know if this entry is going to be
// the last entry for `left`, we only know this when we push a greater `left` later on.
if left < self.last_left || (left == self.last_left && right <= self.last_right) {
panic!("tried to push an unordered adjacent pair");
}

// the left hand side of the adjacencylist is expected to be a continuous range from 1 up to the max
// but when adding entries, there may be holes. We handle holes by writing a '0' to the logarray
// (which is otherwise an invalid right-hand side) and pushing a 1 onto the bitarray to immediately close the segment.
let skip = left - self.last_left;

if self.last_left == 0 && skip == 1 {
// this is the first entry. we can't push a bit yet
} else if skip == 0 {
// same `left` as before. so the previous entry was not the last one, and the bitarray gets a 0 appended.
self.bitarray.push(false);
} else {
// if this is the first element, but we do need to skip, make sure we write one less bit than we'd usually do
let bitskip = if self.last_left == 0 { skip - 1 } else { skip };
// there's a different `left`. we push a bunch of 1s to the bitarray, and 0s to the num array.
for _ in 0..bitskip {
self.bitarray.push(true);
}
for _ in 0..(skip - 1) {
self.nums.push(0);
}
}

// finally push right to the logarray
self.nums.push(right);
self.last_left = left;
self.last_right = right;
}

pub fn push_all<I: Iterator<Item = (u64, u64)>>(&mut self, mut iter: I) {
while let Some((left, right)) = iter.next() {
self.push(left, right);
}
}

pub fn finalize(mut self) -> (Bytes, Bytes) {
if self.nums.count() != 0 {
// push last bit to bitarray
self.bitarray.push(true);
}

let ba = self.bitarray.finalize();
let nums = self.nums.finalize();

(ba.freeze(), nums.freeze())
}

pub fn count(&self) -> u64 {
self.bitarray.count()
}
}

pub struct UnindexedAdjacencyListBuilder<W1: SyncableFile, W2: SyncableFile> {
bitarray: BitArrayFileBuilder<W1>,
nums: LogArrayFileBuilder<W2>,
Expand Down Expand Up @@ -307,6 +393,59 @@ impl<W1: SyncableFile, W2: SyncableFile> UnindexedAdjacencyListBuilder<W1, W2> {
}
}

pub struct AdjacencyListBufBuilder {
builder: UnindexedAdjacencyListBufBuilder,
bitindex_blocks: BytesMut,
bitindex_sblocks: BytesMut,
}

impl AdjacencyListBufBuilder {
pub fn new(width: u8) -> AdjacencyListBufBuilder {
AdjacencyListBufBuilder {
builder: UnindexedAdjacencyListBufBuilder::new(width),
bitindex_blocks: BytesMut::new(),
bitindex_sblocks: BytesMut::new(),
}
}

pub fn push(&mut self, left: u64, right: u64) {
self.builder.push(left, right)
}

pub fn push_all<I: Iterator<Item = (u64, u64)>>(&mut self, iter: I) {
self.builder.push_all(iter)
}

pub fn finalize(self) -> AdjacencyListBuffers {
let AdjacencyListBufBuilder {
builder,
mut bitindex_blocks,
mut bitindex_sblocks,
} = self;
let (bitfile, nums) = builder.finalize();

build_bitindex_from_buf(&bitfile[..], &mut bitindex_blocks, &mut bitindex_sblocks);

AdjacencyListBuffers {
nums,
bits: bitfile,
bitindex_blocks: bitindex_blocks.freeze(),
bitindex_sblocks: bitindex_sblocks.freeze(),
}
}

pub fn count(&self) -> u64 {
self.builder.count()
}
}

pub struct AdjacencyListBuffers {
nums: Bytes,
bits: Bytes,
bitindex_blocks: Bytes,
bitindex_sblocks: Bytes,
}

pub struct AdjacencyListBuilder<F, W1, W2, W3>
where
F: 'static + FileLoad + FileStore,
Expand Down Expand Up @@ -778,4 +917,16 @@ mod tests {
result
);
}

#[test]
fn adjacencylist_buf_builder_works() {
let adjacencies = [(1, 1), (1, 5), (2, 3), (2, 7), (4, 8)];
let mut builder = AdjacencyListBufBuilder::new(8);
builder.push_all(adjacencies.iter().copied());
let buffers = builder.finalize();
let aj = AdjacencyList::from_buffers(buffers);

let result: Vec<_> = aj.iter().collect();
assert_eq!(&adjacencies[..], &result[..]);
}
}
76 changes: 70 additions & 6 deletions src/structure/bitarray.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use super::util;
use crate::storage::*;
use crate::structure::bititer::BitIter;
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::io;
use futures::stream::{Stream, StreamExt, TryStreamExt};
use std::{convert::TryFrom, error, fmt};
Expand Down Expand Up @@ -200,6 +200,70 @@ impl BitArray {
}
}

pub struct BitArrayBufBuilder<B> {
/// Destination of the bit array data.
dest: B,
/// Storage for the next word to be written.
current: u64,
/// Number of bits written to the buffer
count: u64,
}

impl<B: BufMut> BitArrayBufBuilder<B> {
pub fn new(dest: B) -> BitArrayBufBuilder<B> {
BitArrayBufBuilder {
dest,
current: 0,
count: 0,
}
}

pub fn push(&mut self, bit: bool) {
// Set the bit in the current word.
if bit {
// Determine the position of the bit to be set from `count`.
let pos = self.count & 0b11_1111;
self.current |= 0x8000_0000_0000_0000 >> pos;
}

// Advance the bit count.
self.count += 1;

// Check if the new `count` has reached a word boundary.
if self.count & 0b11_1111 == 0 {
// We have filled `current`, so write it to the destination.
self.dest.put_u64(self.current);
self.current = 0;
}
}

pub fn push_all<I: Iterator<Item = bool>>(&mut self, mut iter: I) {
while let Some(bit) = iter.next() {
self.push(bit);
}
}

fn finalize_data(&mut self) {
if self.count & 0b11_1111 != 0 {
self.dest.put_u64(self.current);
}
}

pub fn finalize(mut self) -> B {
let count = self.count;
// Write the final data word.
self.finalize_data();
// Write the control word.
self.dest.put_u64(count);

self.dest
}

pub fn count(&self) -> u64 {
self.count
}
}

pub struct BitArrayFileBuilder<W> {
/// Destination of the bit array data.
dest: W,
Expand Down Expand Up @@ -317,22 +381,22 @@ pub fn bitarray_stream_blocks<R: AsyncRead + Unpin>(r: R) -> FramedRead<R, BitAr
FramedRead::new(r, BitArrayBlockDecoder { readahead: None })
}

pub fn bitarray_iter_blocks<B: Buf>(b: &mut B) -> BitArrayBlockIterator<B> {
pub fn bitarray_iter_blocks<B: Buf>(b: B) -> BitArrayBlockIterator<B> {
BitArrayBlockIterator {
buf: b,
readahead: None,
}
}

pub struct BitArrayBlockIterator<'a, B: Buf> {
buf: &'a mut B,
pub struct BitArrayBlockIterator<B: Buf> {
buf: B,
readahead: Option<u64>,
}

impl<'a, B: Buf> Iterator for BitArrayBlockIterator<'a, B> {
impl<B: Buf> Iterator for BitArrayBlockIterator<B> {
type Item = u64;
fn next(&mut self) -> Option<u64> {
decode_next_bitarray_block(self.buf, &mut self.readahead)
decode_next_bitarray_block(&mut self.buf, &mut self.readahead)
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/structure/bitindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,10 @@ pub async fn build_bitindex<
Ok(())
}

pub fn build_bitindex_from_block_iter<'a, I: Iterator<Item = u64>, B1: BufMut, B2: BufMut>(
blocks_iter: &'a mut I,
blocks: &mut B1,
sblocks: &mut B2,
pub fn build_bitindex_from_block_iter<I: Iterator<Item = u64>, B1: BufMut, B2: BufMut>(
blocks_iter: I,
blocks: B1,
sblocks: B2,
) {
// the following widths are unoptimized, but should always be large enough
let mut blocks_builder =
Expand Down Expand Up @@ -474,9 +474,9 @@ pub fn build_bitindex_from_block_iter<'a, I: Iterator<Item = u64>, B1: BufMut, B
}

pub fn build_bitindex_from_buf<B1: Buf, B2: BufMut, B3: BufMut>(
bitarray: &mut B1,
blocks: &mut B2,
sblocks: &mut B3,
bitarray: B1,
blocks: B2,
sblocks: B3,
) {
let mut iter = bitarray_iter_blocks(bitarray);
build_bitindex_from_block_iter(&mut iter, blocks, sblocks)
Expand Down
Loading

0 comments on commit f807c74

Please sign in to comment.