Skip to content

Plugin System

pydeflate v2.2+ includes a plugin system for registering custom data sources without modifying the package code. This enables integration with proprietary data, alternative public sources, or custom calculations.

Why Use Plugins?

Built-in sources (IMF, World Bank, OECD DAC) may not cover all use cases:

  • Regional central bank data
  • Proprietary economic forecasts
  • Custom deflator calculations
  • Alternative exchange rate sources
  • Internal company data

Plugins let you extend pydeflate with custom sources while maintaining the same API.

Quick Start

Here's a minimal plugin:

from pydeflate.plugins import register_source
import pandas as pd

@register_source("my_central_bank")
class MyCentralBankSource:
    """Custom source from my central bank."""

    def __init__(self, update: bool = False):
        self.name = "my_central_bank"
        self._idx = ["pydeflate_year", "pydeflate_entity_code", "pydeflate_iso3"]
        self.data = self._load_data(update)

    def _load_data(self, update: bool) -> pd.DataFrame:
        """Load data from central bank API or file."""
        # Example: Load from CSV
        return pd.read_csv("central_bank_data.csv")

    def lcu_usd_exchange(self) -> pd.DataFrame:
        """Return exchange rate data."""
        return self.data[self._idx + ["pydeflate_EXCHANGE"]]

    def price_deflator(self, kind: str = "NGDP_D") -> pd.DataFrame:
        """Return deflator data."""
        return self.data[self._idx + [f"pydeflate_{kind}"]]

    def validate(self) -> None:
        """Validate data format."""
        required_cols = self._idx + ["pydeflate_EXCHANGE", "pydeflate_NGDP_D"]
        missing = [c for c in required_cols if c not in self.data.columns]
        if missing:
            raise ValueError(f"Missing columns: {missing}")

# Plugin is now registered and ready to use

Plugin Interface

Plugins must implement the SourceProtocol:

Required Methods

__init__(self, update: bool = False)

Initialize the source and load data.

Parameters: - update: If True, download fresh data. If False, use cached data.

Required attributes: - self.name: Source name (string) - self._idx: Index columns (must be ["pydeflate_year", "pydeflate_entity_code", "pydeflate_iso3"]) - self.data: DataFrame with deflator/exchange data

lcu_usd_exchange() -> pd.DataFrame

Return exchange rates from local currency to USD.

Returns: DataFrame with columns: - pydeflate_year - pydeflate_entity_code - pydeflate_iso3 - pydeflate_EXCHANGE: LCU per USD

price_deflator(kind: str) -> pd.DataFrame

Return price deflator data.

Parameters: - kind: Deflator type (e.g., "NGDP_D" for GDP deflator, "PCPI" for CPI)

Returns: DataFrame with columns: - pydeflate_year - pydeflate_entity_code - pydeflate_iso3 - pydeflate_{kind}: Deflator index

validate() -> None

Validate data format and completeness.

Raises: Exception if validation fails

Complete Example

A production-ready plugin with caching, error handling, and multiple deflators:

from pydeflate.plugins import register_source
from pydeflate.exceptions import DataSourceError, NetworkError
import pandas as pd
import requests
from pathlib import Path
import json

@register_source("eurostat")
class EurostatSource:
    """
    Custom source using Eurostat data.

    Provides GDP deflators and exchange rates for EU countries.
    """

    def __init__(self, update: bool = False):
        self.name = "eurostat"
        self._idx = ["pydeflate_year", "pydeflate_entity_code", "pydeflate_iso3"]

        # Cache directory
        self.cache_dir = Path.home() / ".pydeflate" / "eurostat"
        self.cache_dir.mkdir(parents=True, exist_ok=True)

        # Load or download data
        self.data = self._load_or_fetch(update)

        # Validate
        self.validate()

    def _load_or_fetch(self, update: bool) -> pd.DataFrame:
        """Load from cache or fetch from Eurostat API."""
        cache_file = self.cache_dir / "eurostat_data.parquet"

        # Try cache first
        if not update and cache_file.exists():
            try:
                return pd.read_parquet(cache_file)
            except Exception as e:
                raise DataSourceError(f"Failed to load cached data: {e}")

        # Fetch from API
        try:
            data = self._fetch_from_api()
            # Save to cache
            data.to_parquet(cache_file)
            return data
        except Exception as e:
            raise NetworkError(f"Failed to fetch Eurostat data: {e}")

    def _fetch_from_api(self) -> pd.DataFrame:
        """Fetch data from Eurostat API."""
        # Example API call (simplified)
        url = "https://ec.europa.eu/eurostat/api/dissemination/..."

        response = requests.get(url, timeout=30)
        response.raise_for_status()

        # Parse response (simplified)
        raw_data = response.json()

        # Transform to pydeflate format
        return self._transform_data(raw_data)

    def _transform_data(self, raw_data: dict) -> pd.DataFrame:
        """Transform Eurostat data to pydeflate format."""
        # Example transformation
        records = []

        for country in raw_data['countries']:
            iso3 = country['iso3']
            entity_code = country['code']

            for year_data in country['time_series']:
                year = int(year_data['year'])
                gdp_deflator = year_data.get('gdp_deflator', None)
                exchange_rate = year_data.get('exchange_rate', None)

                records.append({
                    'pydeflate_year': year,
                    'pydeflate_entity_code': entity_code,
                    'pydeflate_iso3': iso3,
                    'pydeflate_NGDP_D': gdp_deflator,
                    'pydeflate_EXCHANGE': exchange_rate
                })

        return pd.DataFrame(records)

    def lcu_usd_exchange(self) -> pd.DataFrame:
        """Return EUR/USD and other EU exchange rates."""
        return self.data[self._idx + ["pydeflate_EXCHANGE"]]

    def price_deflator(self, kind: str = "NGDP_D") -> pd.DataFrame:
        """Return price deflator (GDP deflator by default)."""
        col_name = f"pydeflate_{kind}"

        if col_name not in self.data.columns:
            raise ValueError(f"Deflator type '{kind}' not available in Eurostat source")

        return self.data[self._idx + [col_name]]

    def validate(self) -> None:
        """Validate data format."""
        # Check required columns
        required = self._idx + ["pydeflate_EXCHANGE", "pydeflate_NGDP_D"]
        missing = [c for c in required if c not in self.data.columns]

        if missing:
            raise DataSourceError(f"Missing required columns: {missing}")

        # Check for data
        if len(self.data) == 0:
            raise DataSourceError("No data loaded")

        # Check data types
        if not pd.api.types.is_numeric_dtype(self.data["pydeflate_EXCHANGE"]):
            raise DataSourceError("Exchange rate column must be numeric")

        # Check for nulls in critical columns
        null_counts = self.data[self._idx].isnull().sum()
        if null_counts.any():
            raise DataSourceError(f"Null values in index columns: {null_counts[null_counts > 0]}")

# Usage
from pydeflate import BaseDeflate

# The plugin is automatically registered
# Now you can use it like built-in sources
source = EurostatSource(update=True)
print(f"Loaded {len(source.data)} records from Eurostat")

Using Custom Sources

List Available Sources

from pydeflate.plugins import list_sources

sources = list_sources()
print(sources)
# ['DAC', 'IMF', 'World Bank', 'my_central_bank', 'eurostat']

Check if Source is Registered

from pydeflate.plugins import is_source_registered

if is_source_registered("eurostat"):
    print("Eurostat plugin available")

Get Source Instance

from pydeflate.plugins import get_source

# Get instance of custom source
eurostat = get_source("eurostat", update=False)

# Access data
exchange_data = eurostat.lcu_usd_exchange()
deflator_data = eurostat.price_deflator("NGDP_D")

Use with BaseDeflate

Integrate custom source with pydeflate's deflation engine:

from pydeflate.core.api import BaseDeflate
from pydeflate.plugins import get_source
import pandas as pd

# Get custom source
eurostat = get_source("eurostat")

# Create deflator using custom source
deflator = BaseDeflate(source=eurostat)

# Your data
data = {
    'country': ['FRA', 'DEU', 'ITA'],
    'year': [2015, 2016, 2017],
    'value': [1000, 1100, 1200]
}
df = pd.DataFrame(data)

# Deflate using custom source
result = deflator.deflate(
    data=df,
    base_year=2015,
    source_currency="FRA",
    target_currency="USA",
    id_column="country",
    value_column="value",
    target_value_column="value_constant"
)

print(result)

Data Format Requirements

Your plugin's data must follow pydeflate's schema:

Index Columns (Required)

self._idx = [
    "pydeflate_year",         # int: Year
    "pydeflate_entity_code",  # str: Source-specific country code
    "pydeflate_iso3"          # str: ISO3 country code
]

Exchange Rate Column (Required)

"pydeflate_EXCHANGE"  # float: Local currency per USD

Deflator Columns (At Least One Required)

"pydeflate_NGDP_D"   # float: GDP deflator (index)
"pydeflate_PCPI"     # float: CPI (index)
"pydeflate_PCPIE"    # float: CPI end-of-period (index)
# ... add custom deflator types

Example Data Structure

data = pd.DataFrame({
    'pydeflate_year': [2015, 2016, 2017],
    'pydeflate_entity_code': ['USA', 'USA', 'USA'],
    'pydeflate_iso3': ['USA', 'USA', 'USA'],
    'pydeflate_EXCHANGE': [1.0, 1.0, 1.0],  # USD to USD
    'pydeflate_NGDP_D': [100.0, 102.0, 104.0],
    'pydeflate_PCPI': [100.0, 101.5, 103.2]
})

Error Handling in Plugins

Use pydeflate's exception hierarchy:

from pydeflate.plugins import register_source
from pydeflate.exceptions import (
    DataSourceError,
    NetworkError,
    SchemaValidationError
)

@register_source("my_source")
class MySource:
    def __init__(self, update: bool = False):
        try:
            self.data = self._load_data(update)
        except requests.RequestException as e:
            raise NetworkError(f"Failed to download data: {e}")
        except ValueError as e:
            raise DataSourceError(f"Data parsing error: {e}")

        self.validate()

    def validate(self):
        """Validate data."""
        if self.data.empty:
            raise DataSourceError("No data loaded")

        required_cols = ["pydeflate_year", "pydeflate_iso3", "pydeflate_EXCHANGE"]
        missing = [c for c in required_cols if c not in self.data.columns]

        if missing:
            raise SchemaValidationError(f"Missing required columns: {missing}")

Testing Plugins

Write tests for your custom source:

import pytest
from pydeflate.plugins import get_source, is_source_registered

def test_plugin_registered():
    """Test plugin is registered."""
    assert is_source_registered("my_central_bank")

def test_plugin_loads_data():
    """Test plugin loads data correctly."""
    source = get_source("my_central_bank", update=False)

    # Check attributes
    assert source.name == "my_central_bank"
    assert hasattr(source, 'data')
    assert len(source.data) > 0

def test_exchange_rates():
    """Test exchange rate data format."""
    source = get_source("my_central_bank")
    exchange_data = source.lcu_usd_exchange()

    # Check columns
    assert "pydeflate_year" in exchange_data.columns
    assert "pydeflate_iso3" in exchange_data.columns
    assert "pydeflate_EXCHANGE" in exchange_data.columns

    # Check data types
    assert exchange_data["pydeflate_EXCHANGE"].dtype == float

def test_deflator():
    """Test deflator data format."""
    source = get_source("my_central_bank")
    deflator_data = source.price_deflator("NGDP_D")

    # Check columns
    assert "pydeflate_NGDP_D" in deflator_data.columns

    # Check values are positive
    assert (deflator_data["pydeflate_NGDP_D"] > 0).all()

def test_validation():
    """Test validation catches errors."""
    source = get_source("my_central_bank")

    # Should not raise
    source.validate()

Best Practices

1. Use Caching

Download data once, cache for reuse:

def _load_or_fetch(self, update: bool):
    cache_file = self.cache_dir / "data.parquet"

    if not update and cache_file.exists():
        return pd.read_parquet(cache_file)

    data = self._fetch_from_api()
    data.to_parquet(cache_file)
    return data

2. Handle Missing Data

def price_deflator(self, kind: str = "NGDP_D"):
    col_name = f"pydeflate_{kind}"

    if col_name not in self.data.columns:
        raise ValueError(f"Deflator '{kind}' not available")

    return self.data[self._idx + [col_name]]

3. Validate Thoroughly

def validate(self):
    # Check structure
    if self.data.empty:
        raise DataSourceError("No data")

    # Check columns
    required = self._idx + ["pydeflate_EXCHANGE"]
    missing = [c for c in required if c not in self.data.columns]
    if missing:
        raise SchemaValidationError(f"Missing: {missing}")

    # Check data quality
    if (self.data["pydeflate_EXCHANGE"] <= 0).any():
        raise DataSourceError("Exchange rates must be positive")

4. Document Your Plugin

@register_source("ecb")
class ECBSource:
    """
    European Central Bank data source.

    Provides:
    - EUR/USD and other major currency exchange rates
    - HICP (Harmonized Index of Consumer Prices) for EU countries

    Data coverage:
    - Countries: EU27 + UK, Switzerland, Norway
    - Time range: 1999-present
    - Update frequency: Daily (exchange rates), Monthly (HICP)

    Usage:
        >>> from pydeflate.plugins import get_source
        >>> ecb = get_source("ecb", update=True)
        >>> rates = ecb.lcu_usd_exchange()
    """

Next Steps