Using Parquet Storage for Large Datasets

Introduction

When working with large brain connectome datasets, storing all matrices in memory as R lists can be inefficient. The riemtan package now supports Apache Arrow Parquet format for efficient storage and lazy loading of SPD matrices.

This vignette demonstrates how to:

Benefits of Parquet Storage

Installation

Make sure you have the arrow package installed:

install.packages("arrow")

Basic Workflow

1. Writing Connectomes to Parquet

First, let’s create some example SPD matrices and save them to Parquet format:

library(riemtan)
library(Matrix)

# Create example connectomes (4x4 SPD matrices)
set.seed(42)
connectomes <- lapply(1:50, function(i) {
  mat <- diag(4) + matrix(rnorm(16, 0, 0.1), 4, 4)
  mat <- (mat + t(mat)) / 2  # Make symmetric
  mat <- mat + diag(4) * 0.5  # Ensure positive definite
  Matrix::pack(Matrix::Matrix(mat, sparse = FALSE))
})

# Write to Parquet format
write_connectomes_to_parquet(
  connectomes,
  output_dir = "my_connectomes",
  subject_ids = paste0("subject_", 1:50),
  provenance = list(
    study = "Example Study",
    acquisition_date = "2024-01-01",
    preprocessing = "Standard pipeline v1.0"
  )
)

This creates a directory structure:

my_connectomes/
├── metadata.json
├── matrix_0001.parquet
├── matrix_0002.parquet
├── ...
└── matrix_0050.parquet

2. Validating Parquet Directory

Before using a Parquet directory, you can validate its structure:

# Detailed validation with verbose output
validate_parquet_directory("my_connectomes", verbose = TRUE)

3. Creating a CSample with Parquet Backend

Now use the Parquet backend to create a CSample:

# Load AIRM metric
data(airm)

# Create Parquet backend (default cache size: 10 matrices)
backend <- create_parquet_backend(
  "my_connectomes",
  cache_size = 10
)

# Create CSample with the backend
sample <- CSample$new(
  backend = backend,
  metric_obj = airm
)

# Sample info
print(paste("Sample size:", sample$sample_size))
print(paste("Matrix dimension:", sample$matrix_size))

4. Computing with Parquet-backed CSample

All standard CSample operations work transparently with the Parquet backend:

# Compute tangent images
sample$compute_tangents()

# Compute vectorized images
sample$compute_vecs()

# Compute Frechet mean
sample$compute_fmean(tol = 0.01, max_iter = 50)

# Center the sample
sample$center()

# Compute variation
sample$compute_variation()
print(paste("Variation:", sample$variation))

5. Lazy Loading Behavior

The Parquet backend loads matrices on-demand:

# Access specific connectome (loads from disk if not cached)
conn_1 <- sample$connectomes[[1]]

# Access all connectomes (loads all from disk)
all_conns <- sample$connectomes

# Cache management
backend$get_cache_size()  # Check current cache usage
backend$clear_cache()     # Clear cache to free memory

Working with CSuperSample

CSuperSample works seamlessly with Parquet-backed samples:

# Create two samples with different backends
backend1 <- create_parquet_backend("study1_connectomes")
backend2 <- create_parquet_backend("study2_connectomes")

sample1 <- CSample$new(backend = backend1, metric_obj = airm)
sample2 <- CSample$new(backend = backend2, metric_obj = airm)

# Create super sample
super_sample <- CSuperSample$new(list(sample1, sample2))

# Gather all connectomes
super_sample$gather()

# Compute statistics
super_sample$compute_fmean()
super_sample$compute_variation()

Backwards Compatibility

The Parquet backend is fully compatible with the traditional list-based approach:

# Traditional approach (all in memory)
sample_memory <- CSample$new(
  conns = connectomes,
  metric_obj = airm
)

# Parquet approach (lazy loading)
backend <- create_parquet_backend("my_connectomes")
sample_parquet <- CSample$new(
  backend = backend,
  metric_obj = airm
)

# Both work identically
sample_memory$compute_fmean()
sample_parquet$compute_fmean()

Performance Tips

Cache Size Tuning

Adjust cache size based on available memory:

# Small cache (memory-constrained environments)
backend_small <- ParquetBackend$new("my_connectomes", cache_size = 5)

# Large cache (memory-rich environments)
backend_large <- ParquetBackend$new("my_connectomes", cache_size = 50)

Batch Operations

For operations that need all matrices, consider loading in batches:

# Process in chunks
n <- sample$sample_size
batch_size <- 10

for (start in seq(1, n, by = batch_size)) {
  end <- min(start + batch_size - 1, n)

  # Load batch
  batch <- lapply(start:end, function(i) backend$get_matrix(i))

  # Process batch...

  # Clear cache to free memory
  backend$clear_cache()
}

Parallel Processing with Parquet

New in v0.2.5: riemtan now supports cross-platform parallel processing that works seamlessly with the Parquet backend.

Enabling Parallel Processing

Configure parallel processing using the futureverse framework:

library(riemtan)

# Enable parallel processing (works on all platforms including Windows!)
set_parallel_plan("multisession", workers = 4)

# Check status
is_parallel_enabled()  # TRUE
get_n_workers()        # 4

# Create Parquet-backed sample
backend <- create_parquet_backend("large_dataset", cache_size = 20)
sample <- CSample$new(backend = backend, metric_obj = airm)

Parallel I/O and Computation

All major operations automatically use parallel processing when beneficial:

# Parallel tangent computations with progress bar
sample$compute_tangents(progress = TRUE)   # 3-8x faster

# Parallel vectorization
sample$compute_vecs(progress = TRUE)       # 2-4x speedup

# Parallel Frechet mean computation
sample$compute_fmean(progress = TRUE)      # 2-5x faster for large samples

Batch Loading with Parallel I/O

For very large Parquet datasets, use batch loading with parallel I/O and memory management:

# Load specific subset in batches
subset_conns <- sample$load_connectomes_batched(
  indices = 1:500,        # Load first 500 matrices
  batch_size = 50,        # 50 matrices per batch
  progress = TRUE         # Show progress
)

# This loads 500 matrices in 10 batches, clearing cache between batches
# Each batch is loaded in parallel for 5-10x speedup

Performance Comparison

# Sequential (default if not configured)
set_parallel_plan("sequential")
system.time(sample$compute_tangents())  # Baseline

# Parallel with 4 workers
set_parallel_plan("multisession", workers = 4)
system.time(sample$compute_tangents())  # 3-4x faster

# Parallel with 8 workers
set_parallel_plan("multisession", workers = 8)
system.time(sample$compute_tangents())  # 6-8x faster

Progress Reporting

Enable progress bars to monitor long-running operations:

# Install progressr for progress bars (optional)
install.packages("progressr")

# Enable parallel processing
set_parallel_plan("multisession", workers = 4)

# All operations support progress parameter
sample$compute_tangents(progress = TRUE)
sample$compute_vecs(progress = TRUE)
sample$compute_fmean(progress = TRUE)

# Batch loading with progress
conns <- sample$load_connectomes_batched(
  indices = 1:1000,
  batch_size = 100,
  progress = TRUE  # Shows "Batch 1/10: loading matrices 1-100"
)

Best Practices

1. Choose appropriate worker count:

# Conservative (leave cores for system)
set_parallel_plan("multisession", workers = parallel::detectCores() - 1)

# Maximum performance (use all cores)
set_parallel_plan("multisession", workers = parallel::detectCores())

2. Memory management with parallelization:

# Each worker loads its own data copy
# For 4 workers with 100 matrices of 200x200, expect:
# Memory = 4 workers × cache_size × matrix_size ≈ 4 × 20 × 320 KB ≈ 25 MB

# Use smaller cache with more workers
backend <- create_parquet_backend("dataset", cache_size = 10)
set_parallel_plan("multisession", workers = 8)

3. Reset when done:

# Compute with parallelization
set_parallel_plan("multisession", workers = 4)
sample$compute_tangents(progress = TRUE)
sample$compute_fmean(progress = TRUE)

# Reset to free worker resources
reset_parallel_plan()

Auto-Detection

Parallel processing is automatically disabled for small datasets to avoid overhead:

# Small dataset (n < 10): Uses sequential processing automatically
small_sample <- CSample$new(conns = small_connectomes[1:5], metric_obj = airm)
small_sample$compute_tangents()  # Sequential (no overhead)

# Large dataset (n >= 10): Uses parallel processing automatically
large_sample <- CSample$new(backend = backend, metric_obj = airm)
large_sample$compute_tangents()  # Parallel (if plan is active)

Parallel Strategies

Different strategies for different environments:

# multisession: Works on all platforms (Windows, Mac, Linux)
set_parallel_plan("multisession", workers = 4)

# multicore: Unix only, lower overhead
set_parallel_plan("multicore", workers = 4)  # Auto-fallback to multisession on Windows

# cluster: For remote/distributed computing
set_parallel_plan("cluster", workers = c("node1", "node2"))

Metadata Access

Access metadata stored with your Parquet data:

# Get metadata
metadata <- backend$get_metadata()

# Access subject IDs
subject_ids <- metadata$subject_ids

# Access provenance information
provenance <- metadata$provenance
print(provenance$study)
print(provenance$preprocessing)

Advanced: Custom File Patterns

Customize the Parquet file naming pattern:

write_connectomes_to_parquet(
  connectomes,
  output_dir = "custom_naming",
  file_pattern = "conn_%03d.parquet"
)

# Files will be named: conn_001.parquet, conn_002.parquet, ...

Troubleshooting

Large Datasets

For very large datasets (1000+ matrices):

# Use minimal cache
backend <- ParquetBackend$new("large_dataset", cache_size = 3)

# Compute statistics without loading all matrices at once
sample <- CSample$new(backend = backend, metric_obj = airm)
sample$compute_tangents()
sample$compute_vecs()

# Operations that don't need all matrices in memory
sample$compute_fmean(batch_size = 32)  # Uses batching

Memory Monitoring

# Check cache usage
cache_size <- backend$get_cache_size()
print(paste("Cached matrices:", cache_size))

# Free memory when needed
backend$clear_cache()

Summary

The Parquet backend provides:

For small to medium datasets that fit in memory, the traditional list approach remains perfectly valid. For large brain connectome studies, the Parquet backend enables analysis at scale.