import os.path
import shutil
from datetime import datetime
import bcolz
import numpy as np
import tiledb
GENOME_DOMAIN_NAME = "genome_coord"
SECONDARY_DOMAIN_NAME = "signal_coord"
GENOME_VALUE_NAME = "v"
DEFAULT_GENOME_TILE_EXTENT = 1000
DEFAULT_COMPRESSOR = "lz4"
DEFAULT_COMPRESSION_LEVEL = -1
def write_tiledb(arr, path, overwrite=True):
"""Write a tiledb to disk.
"""
if os.path.exists(path) and os.path.isdir(path) and overwrite:
shutil.rmtree(path)
if os.path.exists(path):
raise FileExistsError("Output path {} already exists".format(path))
ctx = tiledb.Ctx()
n = arr.shape[0]
n_tile_extent = min(DEFAULT_GENOME_TILE_EXTENT, n)
d1 = tiledb.Dim(
ctx, GENOME_DOMAIN_NAME, domain=(0, n - 1), tile=n_tile_extent, dtype="uint32"
)
if arr.ndim == 1:
domain = tiledb.Domain(ctx, d1)
elif arr.ndim == 2:
m = arr.shape[1]
d2 = tiledb.Dim(
ctx, SECONDARY_DOMAIN_NAME, domain=(0, m - 1), tile=m, dtype="uint32"
)
domain = tiledb.Domain(ctx, d1, d2)
else:
raise ValueError("tiledb backend only supports 1D or 2D arrays")
v = tiledb.Attr(ctx, GENOME_VALUE_NAME, compressor=(DEFAULT_COMPRESSOR, DEFAULT_COMPRESSION_LEVEL), dtype="float32")
A = tiledb.DenseArray(
ctx,
path,
domain=domain,
attrs=(v,),
cell_order="row-major",
tile_order="row-major",
)
values = arr.astype(np.float32)
A[:] = {GENOME_VALUE_NAME: values}
def load_tiledb(path):
return TDBDenseArray(path)
class TDBDenseArray(object):
"""A read-only wrapper of tiledb.DenseArray"""
def __init__(self, array):
if isinstance(array, tiledb.DenseArray):
self._arr = array
else:
ctx = tiledb.Ctx()
self._arr = tiledb.DenseArray.load(ctx, array)
def __getitem__(self, key):
return self._arr[key][GENOME_VALUE_NAME]
def __setitem__(self, key, item):
raise NotImplemented("TDBDenseArray is read-only")
@property
def shape(self):
return self._arr.shape
@property
def ndim(self):
return self._arr.ndim
a = load_tiledb('outputs/GM12878_CTCF_ENCFF312KXX.bigWig_tiledb/chr10')
am = a[:]
write_tiledb(am, 'test_blosc_lz4')
def bench(path, n_access=1000, access_len=1000):
if os.path.basename(os.path.dirname(path)).endswith('bcolz'):
arr = bcolz.open(path, mode='r')
else:
arr = load_tiledb(path)
nd = arr.ndim
starts = np.random.randint(1, arr.shape[0] - access_len - 1, n_access).astype(np.int32)
stops = np.array([s + access_len for s in starts]).astype(np.int32)
dt1 = datetime.now()
for start, stop in zip(starts, stops):
if nd == 1:
_ = arr[start:stop]
else:
_ = arr[start:stop, :]
dt2 = datetime.now()
ms = (dt2 - dt1).total_seconds() * 1000
return ms
print(bench('test_blosc_lz4'))
! cp -aR test_blosc_lz4 /dev/shm/cproberttmp/
print(bench('/dev/shm/cproberttmp/test_blosc_lz4'))
bench('outputs/GM12878_CTCF_ENCFF312KXX.bigWig_bcolz/chr1')
res = []
compr = []
ext = []
for DEFAULT_COMPRESSOR in ("zstd", "lz4", "blosc-lz", "blosc-lz4", "blosc-lz4hc", "blosc-snappy", "blosc-zstd", "rle", "bzip2"):
for DEFAULT_GENOME_TILE_EXTENT in (2000, 5000, 10000, 50000, 100000):
write_tiledb(am, 'test_tiledb')
tm = bench('test_tiledb')
print('{} {}: {}'.format(DEFAULT_COMPRESSOR, DEFAULT_GENOME_TILE_EXTENT, tm))
res.append(tm)
compr.append(DEFAULT_COMPRESSOR)
ext.append(DEFAULT_GENOME_TILE_EXTENT)
import matplotlib.pylab as plt
%matplotlib inline
plt.scatter(ext, res)
plt.xscale('log')
res2 = []
compr2 = []
ext2 = []
for DEFAULT_COMPRESSOR in ("lz4", "blosc-lz", "blosc-lz4", "blosc-lz4hc", "blosc-snappy", "blosc-zstd"):
for DEFAULT_GENOME_TILE_EXTENT in (5000, 7500, 10000, 15000, 20000):
write_tiledb(am, 'test_tiledb')
tm = bench('test_tiledb')
print('{} {}: {}'.format(DEFAULT_COMPRESSOR, DEFAULT_GENOME_TILE_EXTENT, tm))
res2.append(tm)
compr2.append(DEFAULT_COMPRESSOR)
ext2.append(DEFAULT_GENOME_TILE_EXTENT)
for r, c, e in zip(res, compr, ext):
if e == 10000:
print('{} {}'.format(c, r))
plt.plot(ext2, res2)
plt.xscale('log')
res3 = []
compr3 = []
ext3 = []
lev3 = []
for DEFAULT_COMPRESSOR in ("lz4", "blosc-lz", "blosc-lz4"):
for DEFAULT_GENOME_TILE_EXTENT in (5000, 7000, 9000):
for DEFAULT_COMPRESSION_LEVEL in (3, 5, 7, -1):
write_tiledb(am, 'test_tiledb')
tm = bench('test_tiledb')
print('{} {} {}: {}'.format(DEFAULT_COMPRESSOR, DEFAULT_GENOME_TILE_EXTENT, DEFAULT_COMPRESSION_LEVEL, tm))
res2.append(tm)
compr2.append(DEFAULT_COMPRESSOR)
ext2.append(DEFAULT_GENOME_TILE_EXTENT)
lev3.append(DEFAULT_COMPRESSION_LEVEL)
DEFAULT_GENOME_TILE_EXTENT = 1000
def write_sparse_array(path, n, n_idxs, values, overwrite=True):
if os.path.exists(path) and os.path.isdir(path) and overwrite:
shutil.rmtree(path)
if os.path.exists(path):
raise FileExistsError("Output path {} already exists".format(path))
if n_idxs.min() < 0 or n_idxs.max() >= n:
raise ValueError('row indexes must be in range [0, n - 1]')
ctx = tiledb.Ctx()
n_tile_extent = min(DEFAULT_GENOME_TILE_EXTENT, n)
d1 = tiledb.Dim(ctx, GENOME_DOMAIN_NAME, domain=(0, n - 1),
tile=n_tile_extent, dtype="uint32")
domain = tiledb.Domain(ctx, d1)
v = tiledb.Attr(ctx, "v", compressor=('lz4', -1), dtype="float32")
A = tiledb.SparseArray(ctx,
path,
domain=domain,
attrs=(v,),
capacity=1000,
cell_order='row-major',
tile_order='row-major')
values = values.astype(np.float32)
A[n_idxs] = {'v': values}
nz = a.nonzero()[0]
write_sparse_array('test_sparse', a.shape[0], nz, a[nz])
def read_space()
np.unique(am).shape[0]
2 ** 32
2 ** 16