Creating Custom Backends
This guide shows you how to create custom storage backends for polystore.
Overview
Custom backends allow you to:
Integrate with custom storage systems
Add support for new file formats
Implement specialized I/O patterns
Connect to remote services (S3, databases, etc.)
Backend inheritance provides automatic registration and a consistent API.
Basic Backend
The simplest backend implements the StorageBackend interface:
from polystore import StorageBackend
import json
from pathlib import Path
class JSONStorageBackend(StorageBackend):
"""Backend that stores everything as JSON."""
_backend_type = 'json_storage' # Auto-registration key
def __init__(self, base_dir="./json_storage"):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
def save(self, data, file_path, **kwargs):
"""Save data as JSON."""
full_path = self.base_dir / file_path
full_path.parent.mkdir(parents=True, exist_ok=True)
with open(full_path, 'w') as f:
json.dump(data, f, indent=2)
def load(self, file_path, **kwargs):
"""Load data from JSON."""
full_path = self.base_dir / file_path
if not full_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
with open(full_path, 'r') as f:
return json.load(f)
def save_batch(self, data_list, output_paths, **kwargs):
"""Save multiple files."""
for data, path in zip(data_list, output_paths):
self.save(data, path, **kwargs)
def load_batch(self, file_paths, **kwargs):
"""Load multiple files."""
return [self.load(path, **kwargs) for path in file_paths]
def list_files(self, directory, pattern=None,
extensions=None, recursive=False, **kwargs):
"""List files in directory."""
dir_path = self.base_dir / directory
if not dir_path.exists():
return []
if recursive:
files = dir_path.rglob("*.json")
else:
files = dir_path.glob("*.json")
return [str(f.relative_to(self.base_dir)) for f in files]
def exists(self, path):
"""Check if path exists."""
return (self.base_dir / path).exists()
def is_file(self, path):
"""Check if path is a file."""
return (self.base_dir / path).is_file()
def is_dir(self, path):
"""Check if path is a directory."""
return (self.base_dir / path).is_dir()
def list_dir(self, path):
"""List directory entries."""
dir_path = self.base_dir / path
if not dir_path.is_dir():
raise NotADirectoryError(f"Not a directory: {path}")
return [f.name for f in dir_path.iterdir()]
Usage:
from polystore import BackendRegistry, FileManager
# Backend is auto-registered
registry = BackendRegistry()
fm = FileManager(registry)
# Use the custom backend
data = {"key": "value", "count": 42}
fm.save(data, "config.json", backend="json_storage")
loaded = fm.load("config.json", backend="json_storage")
print(loaded) # {'key': 'value', 'count': 42}
Read-Only Backend
For read-only data sources, inherit from ReadOnlyBackend:
from polystore import ReadOnlyBackend
import requests
class HTTPBackend(ReadOnlyBackend):
"""Read-only backend for HTTP resources."""
_backend_type = 'http'
def __init__(self, base_url):
self.base_url = base_url.rstrip('/')
def load(self, file_path, **kwargs):
"""Load data from HTTP."""
url = f"{self.base_url}/{file_path}"
response = requests.get(url)
response.raise_for_status()
return response.content
def load_batch(self, file_paths, **kwargs):
"""Load multiple files."""
return [self.load(path, **kwargs) for path in file_paths]
def list_files(self, directory, **kwargs):
"""List files (not implemented for HTTP)."""
raise NotImplementedError("HTTP backend doesn't support listing")
def exists(self, path):
"""Check if resource exists."""
url = f"{self.base_url}/{path}"
response = requests.head(url)
return response.status_code == 200
def is_file(self, path):
"""Always true for HTTP resources."""
return self.exists(path)
def is_dir(self, path):
"""Always false for HTTP."""
return False
def list_dir(self, path):
"""Not supported."""
raise NotImplementedError()
Advanced Features
Context Manager Support
Implement cleanup with context managers:
from polystore import StorageBackend
import tempfile
import shutil
class TemporaryBackend(StorageBackend):
"""Backend with automatic cleanup."""
_backend_type = 'temporary'
def __init__(self):
self.temp_dir = None
def __enter__(self):
self.temp_dir = tempfile.mkdtemp()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.temp_dir:
shutil.rmtree(self.temp_dir)
# Implement required methods...
Usage:
with TemporaryBackend() as backend:
registry = {'temp': backend}
fm = FileManager(registry)
fm.save(data, "temp_file.npy", backend="temp")
# Files automatically cleaned up on exit
Compression Support
Add compression/decompression:
from polystore import StorageBackend
import gzip
import pickle
class CompressedBackend(StorageBackend):
"""Backend with automatic compression."""
_backend_type = 'compressed'
def save(self, data, file_path, **kwargs):
"""Save with compression."""
with gzip.open(file_path, 'wb') as f:
pickle.dump(data, f)
def load(self, file_path, **kwargs):
"""Load with decompression."""
with gzip.open(file_path, 'rb') as f:
return pickle.load(f)
# Implement other required methods...
Caching Layer
Add caching for expensive operations:
from polystore import StorageBackend
from functools import lru_cache
class CachedBackend(StorageBackend):
"""Backend with LRU cache."""
_backend_type = 'cached'
def __init__(self, wrapped_backend, cache_size=128):
self.backend = wrapped_backend
self._cached_load = lru_cache(maxsize=cache_size)(
self._do_load
)
def _do_load(self, file_path):
"""Actual load operation."""
return self.backend.load(file_path)
def load(self, file_path, **kwargs):
"""Load with caching."""
return self._cached_load(file_path)
def save(self, data, file_path, **kwargs):
"""Save (invalidates cache)."""
self.backend.save(data, file_path, **kwargs)
# Clear cache entry if it exists
try:
self._cached_load.cache_clear()
except AttributeError:
pass
# Delegate other methods to wrapped backend...
Database Backend Example
Here’s a complete example connecting to a database:
from polystore import StorageBackend
import sqlite3
import pickle
from pathlib import Path
class SQLiteBackend(StorageBackend):
"""Backend using SQLite for storage."""
_backend_type = 'sqlite'
def __init__(self, db_path="polystore.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Initialize database schema."""
conn = sqlite3.connect(self.db_path)
conn.execute('''
CREATE TABLE IF NOT EXISTS files (
path TEXT PRIMARY KEY,
data BLOB NOT NULL,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def save(self, data, file_path, **kwargs):
"""Save data to database."""
conn = sqlite3.connect(self.db_path)
serialized = pickle.dumps(data)
conn.execute(
'INSERT OR REPLACE INTO files (path, data) VALUES (?, ?)',
(str(file_path), serialized)
)
conn.commit()
conn.close()
def load(self, file_path, **kwargs):
"""Load data from database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
'SELECT data FROM files WHERE path = ?',
(str(file_path),)
)
row = cursor.fetchone()
conn.close()
if not row:
raise FileNotFoundError(f"File not found: {file_path}")
return pickle.loads(row[0])
def save_batch(self, data_list, output_paths, **kwargs):
"""Batch save."""
conn = sqlite3.connect(self.db_path)
for data, path in zip(data_list, output_paths):
serialized = pickle.dumps(data)
conn.execute(
'INSERT OR REPLACE INTO files (path, data) VALUES (?, ?)',
(str(path), serialized)
)
conn.commit()
conn.close()
def load_batch(self, file_paths, **kwargs):
"""Batch load."""
conn = sqlite3.connect(self.db_path)
results = []
for path in file_paths:
cursor = conn.execute(
'SELECT data FROM files WHERE path = ?',
(str(path),)
)
row = cursor.fetchone()
if not row:
raise FileNotFoundError(f"File not found: {path}")
results.append(pickle.loads(row[0]))
conn.close()
return results
def list_files(self, directory, **kwargs):
"""List files in directory."""
conn = sqlite3.connect(self.db_path)
pattern = f"{directory}/%"
cursor = conn.execute(
'SELECT path FROM files WHERE path LIKE ?',
(pattern,)
)
files = [row[0] for row in cursor.fetchall()]
conn.close()
return files
def exists(self, path):
"""Check if path exists."""
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
'SELECT 1 FROM files WHERE path = ?',
(str(path),)
)
exists = cursor.fetchone() is not None
conn.close()
return exists
def is_file(self, path):
"""Check if path is a file."""
return self.exists(path)
def is_dir(self, path):
"""Directories don't exist in flat database."""
return False
def list_dir(self, path):
"""List directory entries."""
return self.list_files(path)
Testing Your Backend
Write comprehensive tests for your backend:
import pytest
import numpy as np
from my_package import MyCustomBackend
class TestMyCustomBackend:
def setup_method(self):
self.backend = MyCustomBackend()
def test_save_and_load(self):
data = np.array([1, 2, 3])
self.backend.save(data, "test.npy")
loaded = self.backend.load("test.npy")
np.testing.assert_array_equal(data, loaded)
def test_batch_operations(self):
data_list = [np.array([i]) for i in range(3)]
paths = [f"test{i}.npy" for i in range(3)]
self.backend.save_batch(data_list, paths)
loaded_list = self.backend.load_batch(paths)
for original, loaded in zip(data_list, loaded_list):
np.testing.assert_array_equal(original, loaded)
def test_file_not_found(self):
with pytest.raises(FileNotFoundError):
self.backend.load("nonexistent.npy")
File Extension Filtering (Streaming Backends)
Streaming backends (Napari, Fiji) should declare which file types they support to avoid attempting to stream incompatible files (CSV, JSON, etc.).
Declare Supported Extensions:
from polystore import StreamingBackend
class MyStreamingBackend(StreamingBackend):
"""Custom streaming backend with extension filtering."""
_backend_type = 'my_stream'
VIEWER_TYPE = 'myviewer'
SHM_PREFIX = 'myviewer_'
# Declare which extensions this backend can stream
SUPPORTED_EXTENSIONS = {
'.tif', '.tiff', '.png', '.jpg', '.jpeg',
'.roi.zip', # Compound extensions work too
}
def save_batch(self, data_list, file_paths, **kwargs):
"""Stream data, automatically filtering unsupported files."""
# Filter to only supported file types
data_list, file_paths, skipped = self._filter_streamable_files(
data_list, file_paths
)
if not data_list:
return
# Now stream only the supported files...
for data, path in zip(data_list, file_paths):
self._stream_data(data, path, **kwargs)
How It Works:
SUPPORTED_EXTENSIONS: Class attribute defining valid file extensions
_filter_streamable_files(): Base class method that filters data
Automatic Logging: Skipped files are logged at INFO level
Adding/Removing Support:
To add support for new formats, simply add to the set:
SUPPORTED_EXTENSIONS = {
'.tif', '.tiff',
'.png', '.jpg', '.jpeg',
'.roi.zip',
'.myformat', # Add your format
}
To remove support, remove from the set or override in subclass:
class RestrictedNapariBackend(NapariStreamingBackend):
"""Napari backend that only supports TIFF files."""
SUPPORTED_EXTENSIONS = {'.tif', '.tiff'}
Why This Pattern:
Prevents Crashes: CSV/JSON files won’t crash the viewer
Clear Contracts: Developers know what each backend supports
Extensible: Easy to add new formats
Consistent: All streaming backends use the same pattern
Best Practices
Auto-Registration: Always define
_backend_typefor automatic registrationError Handling: Raise appropriate exceptions (
FileNotFoundError, etc.)Documentation: Document your backend’s specific features and limitations
Testing: Write comprehensive tests covering all methods
Thread Safety: Consider thread-safety if backend maintains state
Resource Cleanup: Implement
__del__or context managers for cleanupType Hints: Add type hints for better IDE support
Logging: Use logging for debugging and monitoring
Extension Filtering: For streaming backends, always use
SUPPORTED_EXTENSIONS
Publishing Your Backend
To share your backend:
Package It: Create a Python package with your backend
Dependencies: List polystore as a dependency
Documentation: Document installation and usage
Examples: Provide example code
Tests: Include comprehensive tests
Example pyproject.toml:
[project]
name = "polystore-mybackend"
version = "0.1.0"
dependencies = [
"polystore>=0.1.0",
# Your backend's dependencies
]
Example usage in your package:
# my_package/__init__.py
from polystore import StorageBackend
class MyBackend(StorageBackend):
_backend_type = 'mybackend'
# Implementation...
__all__ = ['MyBackend']
Users can then install and use:
pip install polystore-mybackend
import my_package # Imports and registers backend
from polystore import BackendRegistry, FileManager
registry = BackendRegistry()
assert 'mybackend' in registry # Auto-registered!
See Also
Storage Backends - Backend API reference
Backend Registry - Registration system
FileManager - Using backends via FileManager