Package futureexpert

Sub-modules

futureexpert.associator

Contains the models with the configuration for the associator and the result format.

futureexpert.checkin

Contains the models with the configuration for CHECK-IN.

futureexpert.expert_client
futureexpert.forecast

Contains the models with the configuration for the forecast and the result format.

futureexpert.forecast_consistency

Contains the models with the configuration for the hierarchical reconciliation and the result format.

futureexpert.matcher

Contains the models with the configuration for the matcher and the result format.

futureexpert.plot

Contains all the functionality to plot the checked in time series and the forecast and backtesting results.

futureexpert.pool
futureexpert.shaper
futureexpert.shared_models

Shared models used across multiple modules.

Classes

class ActualsCovsConfiguration (**data: Any)
Expand source code
class ActualsCovsConfiguration(BaseModel):
    """Configuration of actuals and covariates via name and lag.

    Parameters
    ----------
    actuals_name: builtins.str
        Name of the time series.
    covs_configurations: builtins.list[futureexpert.shared_models.CovariateRef]
        List of Covariates.
    """
    actuals_name: str
    covs_configurations: list[CovariateRef]

Configuration of actuals and covariates via name and lag.

Parameters

actuals_name : builtins.str
Name of the time series.
covs_configurations : builtins.list[CovariateRef]
List of Covariates.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var actuals_name : str
var covs_configurations : list[CovariateRef]
var model_config
class AssociatorConfig (**data: Any)
Expand source code
class AssociatorConfig(BaseConfig):
    """Service configuration.

    Parameters
    ----------
    data_selection: futureexpert.associator.DataSelection
        Configuration on the selection of time series used for carrying out the service.
    trend_detection: futureexpert.associator.TrendDetectionConfiguration
        Configuration for trend detection.
    clustering: futureexpert.associator.ClusteringConfiguration
        Configuration for clustering.
    report_note: builtins.str
        User-defined string to be included in the report.
    db_name: typing.Optional[builtins.str]
        Only accessible for internal use. Name of the database to use for storing the results.
    """

    data_selection: DataSelection = Field(default_factory=DataSelection)
    trend_detection: TrendDetectionConfiguration = Field(default_factory=TrendDetectionConfiguration)
    clustering: ClusteringConfiguration = Field(default_factory=ClusteringConfiguration)
    report_note: str
    db_name: Optional[str] = None

Service configuration.

Parameters

data_selection : DataSelection
Configuration on the selection of time series used for carrying out the service.
trend_detection : TrendDetectionConfiguration
Configuration for trend detection.
clustering : ClusteringConfiguration
Configuration for clustering.
report_note : builtins.str
User-defined string to be included in the report.
db_name : typing.Optional[builtins.str]
Only accessible for internal use. Name of the database to use for storing the results.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var clusteringClusteringConfiguration
var data_selectionDataSelection
var db_name : str | None
var model_config
var report_note : str
var trend_detectionTrendDetectionConfiguration
class ClusteringConfiguration (**data: Any)
Expand source code
class ClusteringConfiguration(BaseConfig):
    """Configuration for clustering.

    If start_time or end_time is not provided, then the missing(s) of the two will be
    determined automatically; the final four parameters govern this process.

    Parameters
    ----------
    create_clusters: builtins.bool
        If True, then the service will attempt clustering.
    n_clusters: typing.Optional[builtins.int]
        Number of clusters of complete and non-constant time series.
    start_time: typing.Optional[datetime.datetime]
        Observations from start_time (inclusive) onwards will be considered during clustering.
    end_time: typing.Optional[datetime.datetime]
        Observations up to end_time (inclusive) will be considered during clustering.
    """
    create_clusters: bool = True
    n_clusters: Optional[int] = Field(default=None, gt=0)
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None

    @model_validator(mode='after')
    def validate_times(self) -> 'ClusteringConfiguration':
        if self.start_time is not None and self.end_time is not None and self.start_time > self.end_time:
            raise ValueError('End time precedes start time.')
        return self

Configuration for clustering.

If start_time or end_time is not provided, then the missing(s) of the two will be determined automatically; the final four parameters govern this process.

Parameters

create_clusters : builtins.bool
If True, then the service will attempt clustering.
n_clusters : typing.Optional[builtins.int]
Number of clusters of complete and non-constant time series.
start_time : typing.Optional[datetime.datetime]
Observations from start_time (inclusive) onwards will be considered during clustering.
end_time : typing.Optional[datetime.datetime]
Observations up to end_time (inclusive) will be considered during clustering.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var create_clusters : bool
var end_time : datetime.datetime | None
var model_config
var n_clusters : int | None
var start_time : datetime.datetime | None

Methods

def validate_times(self) ‑> ClusteringConfiguration
Expand source code
@model_validator(mode='after')
def validate_times(self) -> 'ClusteringConfiguration':
    if self.start_time is not None and self.end_time is not None and self.start_time > self.end_time:
        raise ValueError('End time precedes start time.')
    return self
class CovariateRef (**data: Any)
Expand source code
class CovariateRef(BaseModel):
    """Covariate reference.

    Parameters
    ----------
    name: builtins.str
        Name of the Covariate
    lag: builtins.int
        Lag by which the covariate was used.
    """
    name: str
    lag: int

Covariate reference.

Parameters

name : builtins.str
Name of the Covariate
lag : builtins.int
Lag by which the covariate was used.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var lag : int
var model_config
var name : str
class DataDefinition (**data: Any)
Expand source code
class DataDefinition(BaseConfig):
    """Model for the input parameter needed for the first CHECK-IN step.
    Every single column in your data must be accounted for. Each column must either be assigned a type (`date_column`,
    `value_columns`, `group_columns`) or be explicitly marked for removal in `remove_columns`.

    Parameters
    ----------
    date_column: futureexpert.checkin.DateColumn
        Definition of the date column. Must be a single column that contains the complete date information.
    value_columns: builtins.list[futureexpert.checkin.ValueColumn]
        Definitions of the value columns. Not all columns defined here must be used for time series creation;
        selecting a subset or combining is possible in a later step.
    group_columns: builtins.list[futureexpert.checkin.GroupColumn]
        Definitions of the group columns. Not all columns defined here must be used for time series creation; selecting
        a subset is possible in a later step. Grouping information can also be used to create hierarchical levels.
    remove_rows: typing.Optional[builtins.list[builtins.int]]
        Indexes of the rows to be removed before validation. Note: If the raw data was committed as pandas data frame
        the header is the first row (row index 0).
    remove_columns: typing.Optional[builtins.list[builtins.int]]
        Indexes of the columns to be removed before validation. Any column that is not assigned a type must be listed here.
    """
    date_column: DateColumn
    value_columns: list[ValueColumn]
    group_columns: list[GroupColumn] = []
    remove_rows: Optional[list[int]] = []
    remove_columns: Optional[list[int]] = []

Model for the input parameter needed for the first CHECK-IN step. Every single column in your data must be accounted for. Each column must either be assigned a type (date_column, value_columns, group_columns) or be explicitly marked for removal in remove_columns.

Parameters

date_column : DateColumn
Definition of the date column. Must be a single column that contains the complete date information.
value_columns : builtins.list[ValueColumn]
Definitions of the value columns. Not all columns defined here must be used for time series creation; selecting a subset or combining is possible in a later step.
group_columns : builtins.list[GroupColumn]
Definitions of the group columns. Not all columns defined here must be used for time series creation; selecting a subset is possible in a later step. Grouping information can also be used to create hierarchical levels.
remove_rows : typing.Optional[builtins.list[builtins.int]]
Indexes of the rows to be removed before validation. Note: If the raw data was committed as pandas data frame the header is the first row (row index 0).
remove_columns : typing.Optional[builtins.list[builtins.int]]
Indexes of the columns to be removed before validation. Any column that is not assigned a type must be listed here.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var date_columnDateColumn
var group_columns : list[GroupColumn]
var model_config
var remove_columns : list[int] | None
var remove_rows : list[int] | None
var value_columns : list[ValueColumn]
class DataSelection (**data: Any)
Expand source code
class DataSelection(BaseConfig):
    """Time series selection.

    Parameters
    ----------
    version: typing.Optional[builtins.str]
        Time series version to be used. If None, then the latest version is used that matches the given filter.
    filter: builtins.dict[builtins.str, typing.Any]
        Filter to select a time series version based on their metadata. e.g. description.
        Only applies if version is not specified.
    """

    version: Optional[str] = None
    filter: dict[str, Any] = Field(default_factory=dict)

Time series selection.

Parameters

version : typing.Optional[builtins.str]
Time series version to be used. If None, then the latest version is used that matches the given filter.
filter : builtins.dict[builtins.str, typing.Any]
Filter to select a time series version based on their metadata. e.g. description. Only applies if version is not specified.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var filter : dict[str, typing.Any]
var model_config
var version : str | None
class ExpertClient (refresh_token: Optional[str] = None,
access_token: Optional[str] = None,
group: Optional[str] = None,
environment: "Optional[Literal['production', 'staging', 'development']]" = None,
timeout: int = 300,
max_retries: int = 3)
Expand source code
class ExpertClient:
    """Client for the FutureEXPERT REST API.

    This client provides the same interface as futureexpert.ExpertClient but
    communicates with the expert-api REST API instead of directly with the backend.

    It can be used as a drop-in replacement for ExpertClient when you want to
    use the REST API instead of the Python SDK.
    """

    def __init__(
        self,
        refresh_token: Optional[str] = None,
        access_token: Optional[str] = None,
        group: Optional[str] = None,
        environment: Optional[Literal['production', 'staging', 'development']] = None,
        timeout: int = 300,
        max_retries: int = 3
    ) -> None:
        """Initialize the client from a token.

        If you want to login using username and password, consider using ExpertClient.from_user_password.

        Parameters
        ----------
        refresh_token
            Authentication refresh token for Bearer authentication.
            If not provided, uses environment variable FUTURE_REFRESH_TOKEN.

            You can retrieve a long-lived refresh token (offline token) in the user settings of the futureEXPERT Dashboard
            or using the Open ID Connect token endpoint of our identity provider.

            Example for calling the token endpoint with scope `offline_access`:
            curl -s -X POST "https://future-auth.prognostica.de/realms/future/protocol/openid-connect/token" \
                    -H "Content-Type: application/x-www-form-urlencoded" \
                    --data-urlencode "client_id=expert" \
                    --data-urlencode "grant_type=password" \
                    --data-urlencode "scope=openid offline_access" \
                    --data-urlencode "username=$FUTURE_USER" \
                    --data-urlencode "password=$FUTURE_PW" | jq -r .refresh_token
        access_token
            Authentication access token for Bearer authentication.

            If used instead of refresh_token, no automated token refresh is possible.
        group
            Optional group name for users in multiple groups.
            If not provided, uses environment variable FUTURE_GROUP.
        environment
            Optional environment (production, staging, development).
            If not provided, uses environment variable FUTURE_ENVIRONMENT.
        timeout
            Request timeout in seconds (default: 300)
        max_retries
            Maximum number of retries for failed requests (default: 3)
        """
        self.environment = cast(Literal['production', 'staging', 'development'],
                                environment or os.getenv('FUTURE_ENVIRONMENT') or 'production')
        self.api_url = os.getenv('EXPERT_API_URL', _EXPERT_API_URLS[self.environment]).rstrip('/')
        self.auth_client = FutureAuthClient(environment=self.environment)
        self.group = group or os.getenv('FUTURE_GROUP')

        refresh_token = refresh_token or os.getenv('FUTURE_REFRESH_TOKEN')
        if refresh_token:
            self._oauth_token = self.auth_client.refresh_token(refresh_token)
        else:
            if access_token:
                # Decode access_token token for token signature validation
                self.auth_client.decode_token(access_token)

                # A token without `refresh_token` is never tried to be refreshed in OAuth2Client.
                # A token without `expires_at` / `expires_in` is considered not expired by OAuth2Client.
                self._oauth_token = {'access_token': access_token}
            else:
                raise ValueError(
                    'A token must be provided via parameter `refresh_token` or `access_token` '
                    'or FUTURE_REFRESH_TOKEN environment variable.\nAlternatively, use `.from_user_password`.'
                )

        if not self.group:
            authorized_groups = self.auth_client.get_user_groups(self._oauth_token['access_token'])
            if len(authorized_groups) == 1:
                self.group = authorized_groups[0]
            else:
                raise ValueError(
                    f'You have access to multiple groups. Please select one of the following: {authorized_groups}')

        self.timeout = timeout
        self.max_retries = max_retries
        self.report_status_cache: LRUCache[str, ReportStatus] = LRUCache(maxsize=5)

        logger.info('Successfully logged in to futureEXPERT.')

    def _update_token(self, token: dict[str, Any], refresh_token: str = '', access_token: str = '') -> None:
        """Callback for authlib's OAuth2Client to store a refreshed token.

        The signature (token, refresh_token, access_token) is required by OAuth2Client's update_token interface.
        

    Parameters
    ----------
    token: builtins.dict[builtins.str, typing.Any]

    refresh_token: builtins.str

    access_token: builtins.str

    return: builtins.NoneType

    """
        self._oauth_token = token

    @property
    def oauth2_client(self) -> OAuth2Client:
        # Create httpx client with retry transport
        transport = httpx.HTTPTransport(retries=self.max_retries)

        return OAuth2Client(
            client_id=self.auth_client.auth_configuration.auth_client_id,
            token_endpoint=self.auth_client.openid_configuration.token_endpoint,
            token_endpoint_auth_method=self.auth_client.auth_configuration.token_endpoint_auth_method,
            token=self._oauth_token,
            update_token=self._update_token,
            leeway=30,
            base_url=self.api_url,
            timeout=self.timeout,
            transport=transport
        )

    def _request(
        self,
        method: str,
        path: str,
        params: Mapping[str, Any] = {},
        json_data: Optional[Dict[str, Any]] = None,
        files: Optional[Dict[str, Any]] = None,
        data: Optional[Dict[str, Any]] = None
    ) -> Any:
        """Make HTTP request to the API.

        Parameters
        ----------
        method: builtins.str
            HTTP method (GET, POST, etc.)
        path: builtins.str
            API endpoint path: builtins.str
        params: typing.Mapping[builtins.str, typing.Any]
            Query parameters
        json_data: typing.Optional[typing.dict[builtins.str, typing.Any]]
            JSON request body
        files: typing.Optional[typing.dict[builtins.str, typing.Any]]
            Files for multipart upload
        data: typing.Optional[typing.dict[builtins.str, typing.Any]]
            Form data for multipart upload

        Returns
        -------
        Response data (parsed JSON)

        Raises
        ------
        httpx.HTTPStatusError
            If the request fails
        return: typing.Any

    """
        try:
            params_with_group = {**params, 'group': self.group}
            with self.oauth2_client as client:
                response = client.request(
                    method=method,
                    url=path,
                    params=params_with_group,
                    json=json_data,
                    files=files,
                    data=data
                )
            response.raise_for_status()

            # Return parsed JSON or None for empty responses
            if response.content:
                return response.json()
            return None

        except httpx.HTTPStatusError as e:
            logger.error(
                f'API request {method} {path} failed with status code {e.response.status_code}: {e.response.text}')
            if e.response.status_code == 400:
                raise ValueError(e.response.text)
            if e.response.status_code == 500:
                raise RuntimeError(e.response.text)
            raise

    # ==================== Data Upload and Check-in ====================

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def upload_data(
        self,
        source: Union[pd.DataFrame, str],
        file_specification: Optional[FileSpecification] = None
    ) -> Any:
        """Upload raw data for further processing.

        Parameters
        ----------
        source: typing.Union[pandas.DataFrame, builtins.str]
            Path to a CSV file or a pandas DataFrame.
        file_specification: typing.Optional[futureexpert.checkin.FileSpecification]
            File specification for CSV parsing.

        Returns
        -------
        Upload feedback with user_input_id and file_uuid
        return: typing.Any

    """
        if isinstance(source, pd.DataFrame):
            # Convert DataFrame to JSON for upload
            data_json = source.to_dict(orient='records')
            form_data = {
                'data': json.dumps(data_json)
            }
            if file_specification:
                form_data['file_specification'] = json.dumps(file_specification.model_dump())

            return self._request('POST', '/api/v1/check-in/data', data=form_data)
        else:
            # Upload file
            with open(source, 'rb') as f:
                files = {'file': f}
                data = {}
                if file_specification:
                    data['file_specification'] = json.dumps(file_specification.model_dump())

                return self._request('POST', '/api/v1/check-in/data', files=files, data=data)

    def get_data(self) -> Any:
        """Get available raw data.

        Returns
        -------
        Meta information of the data already uploaded.
        

    Parameters
    ----------
    return: typing.Any

    """
        return self._request('GET', '/api/v1/check-in/data')

    @validate_call
    def check_data_definition(
        self,
        user_input_id: str,
        file_uuid: str,
        data_definition: DataDefinition,
        file_specification: FileSpecification = FileSpecification()
    ) -> Any:
        """Check data definition.

        Parameters
        ----------
        user_input_id: builtins.str
            UUID of the user input.
        file_uuid: builtins.str
            UUID of the file.
        data_definition: futureexpert.checkin.DataDefinition
            Data definition specification.
        file_specification: futureexpert.checkin.FileSpecification
            File specification for CSV parsing.

        Returns
        -------
        Validation result
        return: typing.Any

    """
        logger.info('Started data definition using CHECK-IN...')
        payload = {
            'user_input_id': user_input_id,
            'file_uuid': file_uuid,
            'data_definition': data_definition.model_dump(),
            'file_specification': file_specification.model_dump()
        }
        result = self._request('POST', '/api/v1/check-in/validate', json_data=payload)
        logger.info('Finished data definition.')
        return result

    @validate_call
    def create_time_series(
        self,
        user_input_id: str,
        file_uuid: str,
        data_definition: Optional[DataDefinition] = None,
        config_ts_creation: Optional[TsCreationConfig] = None,
        config_checkin: Optional[str] = None,
        file_specification: FileSpecification = FileSpecification()
    ) -> Any:
        """Create time series from already uploaded data.

        This is the second step of the check-in process, after upload_data.

        Parameters
        ----------
        user_input_id: builtins.str
            UUID of the user input (from upload_data response).
        file_uuid: builtins.str
            UUID of the file (from upload_data response).
        data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
            Data definition specification.
        config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
            Time series creation configuration.
        config_checkin: typing.Optional[builtins.str]
            Path to JSON config file (alternative to data_definition + config_ts_creation).
        file_specification: futureexpert.checkin.FileSpecification
            File specification for CSV parsing.

        Returns
        -------
        Time series creation result with version information
        return: typing.Any

    """
        logger.info('Creating time series using CHECK-IN...')
        form_data: Dict[str, Any] = {
            'user_input_id': user_input_id,
            'file_uuid': file_uuid,
        }

        if data_definition:
            form_data['data_definition'] = json.dumps(data_definition.model_dump())
        if config_ts_creation:
            form_data['config_ts_creation'] = json.dumps(config_ts_creation.model_dump())
        if file_specification:
            form_data['file_specification'] = json.dumps(file_specification.model_dump())

        files: Dict[str, Any] = {}
        if config_checkin:
            files['config_checkin'] = open(config_checkin, 'rb')

        try:
            result = self._request('POST', '/api/v1/check-in/create', files=files or None, data=form_data)
            logger.info('Finished time series creation.')
            return result
        finally:
            for f in files.values():
                f.close()

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def check_in_time_series(
        self,
        raw_data_source: Union[pd.DataFrame, Path, str],
        data_definition: Optional[DataDefinition] = None,
        config_ts_creation: Optional[TsCreationConfig] = None,
        config_checkin: Optional[str] = None,
        file_specification: FileSpecification = FileSpecification()
    ) -> str:
        """Check in time series data.

        Only available in `Standard`, `Premium` and `Enterprise` subscription packages.

        Parameters
        ----------
        raw_data_source: typing.Union[pandas.DataFrame, pathlib.Path, builtins.str]
            DataFrame with raw data or path to CSV file.
        data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
            Data definition specification.
        config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
            Time series creation configuration.
        config_checkin: typing.Optional[builtins.str]
            Path to JSON config file (alternative to data_definition + config_ts_creation).
        file_specification: futureexpert.checkin.FileSpecification
            File specification for CSV parsing.

        Returns
        -------
        Version ID of the created time series
        return: builtins.str

    """
        form_data: Dict[str, Any] = {}
        files: Dict[str, Any] = {}

        if data_definition:
            form_data['data_definition'] = json.dumps(data_definition.model_dump())
        if config_ts_creation:
            form_data['config_ts_creation'] = json.dumps(config_ts_creation.model_dump())
        if config_checkin:
            files['config_checkin'] = open(config_checkin, 'rb')
        if file_specification:
            form_data['file_specification'] = json.dumps(file_specification.model_dump())

        try:

            if isinstance(raw_data_source, pd.DataFrame):

                with tempfile.TemporaryDirectory() as tmpdir:
                    time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S')
                    file_path = os.path.join(tmpdir, f'expert-{time_stamp}.csv')
                    date_format = data_definition.date_column.format if data_definition else None
                    raw_data_source.to_csv(path_or_buf=file_path, index=False, sep=file_specification.delimiter,
                                           decimal=file_specification.decimal, encoding='utf-8-sig',
                                           date_format=date_format)
                    files['file'] = open(file_path, 'rb')
                    result = self._request('POST', '/api/v1/check-in', files=files or None, data=form_data)

            else:
                files['file'] = open(raw_data_source, 'rb')
                result = self._request('POST', '/api/v1/check-in', files=files or None, data=form_data)
            return str(result['version_id'])
        finally:
            for f in files.values():
                f.close()

    @validate_call
    def check_in_pool_covs(
        self,
        requested_pool_covs: List[PoolCovDefinition],
        description: Optional[str] = None
    ) -> CheckInPoolResult:
        """Create a new version from pool covariates.

        Parameters
        ----------
        requested_pool_covs: typing.list[futureexpert.pool.PoolCovDefinition]
            List of pool covariate definitions.
        description: typing.Optional[builtins.str]
            Short description of the selected covariates.

        Returns
        -------
        CheckInPoolResult with version_id and metadata
        return: futureexpert.pool.CheckInPoolResult

    """
        logger.info('Creating time series using checkin-pool...')
        payload = {
            'requested_pool_covs': [cov.model_dump() for cov in requested_pool_covs],
            'description': description
        }
        result = self._request('POST', '/api/v1/check-in/pool-covariate', json_data=payload)
        logger.info('Finished time series creation.')
        return CheckInPoolResult(**result)

    # ==================== Time Series ====================

    @validate_call
    def get_time_series(self, version_id: str) -> CheckInResult:
        """Get time series data by version ID.

        Parameters
        ----------
        version_id: builtins.str
            Time series version ID.

        Returns
        -------
        CheckInResult with time series data
        return: futureexpert.checkin.CheckInResult

    """
        result = self._request('GET', f'/api/v1/ts/{version_id}')
        return CheckInResult(**result)

    @validate_call
    def get_ts_versions(self, skip: int = 0, limit: int = 100) -> PydanticModelList[TimeSeriesVersion]:
        """Get list of time series versions.

        Parameters
        ----------
        skip: builtins.int
            Number of items to skip.
        limit: builtins.int
            Maximum number of items to return.

        Returns
        -------
        DataFrame with time series versions
        return: futureexpert.shared_models.PydanticModelList[futureexpert.checkin.TimeSeriesVersion]

    """
        params = {'skip': skip, 'limit': limit}
        results = self._request('GET', '/api/v1/ts', params=params)
        return PydanticModelList([TimeSeriesVersion.model_validate(raw_result) for raw_result in results])

    # ==================== Pool Covariates ====================

    @validate_call
    def get_pool_cov_overview(
        self,
        granularity: Optional[str] = None,
        search: Optional[str] = None
    ) -> PoolCovOverview:
        """Get overview of available pool covariates.

        Parameters
        ----------
        granularity: typing.Optional[builtins.str]
            Filter by granularity (Day or Month).
        search: typing.Optional[builtins.str]
            Full-text search query.

        Returns
        -------
        PoolCovOverview with available covariates
        return: futureexpert.pool.PoolCovOverview

    """
        params = {}
        if granularity:
            params['granularity'] = granularity
        if search:
            params['search'] = search

        result = self._request('GET', '/api/v1/pool', params=params)
        return PoolCovOverview(overview_json=result['overview_json'])

    # ==================== Forecasting ====================

    @validate_call
    def start_forecast(
        self,
        version: str,
        config: ReportConfig,
        reconciliation_config: Optional[ReconciliationConfig] = None
    ) -> Union[ReportIdentifier, ChainedReportIdentifier]:
        """Start a forecasting report.

        Parameters
        ----------
        version: builtins.str
            Time series version ID.
        config: futureexpert.forecast.ReportConfig
            Forecast configuration.
        reconciliation_config: futureexpert.forecast.ReportConfig
            Configuration to make forecasts consistent over hierarchical levels.

        Returns
        -------
        ReportIdentifier with report_id and settings_id.
        If reconciliation_config is provided, returns ChainedReportIdentifier
        with prerequisites containing the forecast report identifier.
        reconciliation_config: typing.Optional[futureexpert.forecast_consistency.ReconciliationConfig]

    return: typing.Union[futureexpert.shared_models.ReportIdentifier, futureexpert.shared_models.ChainedReportIdentifier]

    """
        payload: Dict[str, Any] = {
            'version': version,
            'config': config.model_dump()
        }

        if reconciliation_config is not None:
            payload['reconciliation_config'] = reconciliation_config.model_dump()

        logger.info('Started creating FORECAST...')
        result = self._request('POST', '/api/v1/forecast', json_data=payload)

        identifier_model = ChainedReportIdentifier if 'prerequisites' in result else ReportIdentifier
        report_identifier = identifier_model.model_validate(result)
        logger.info(f'Report created with ID {report_identifier.report_id}. Forecasts are running...')
        return report_identifier

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def start_forecast_from_raw_data(self,
                                     raw_data_source: Union[pd.DataFrame, Path, str],
                                     config_fc: ReportConfig,
                                     data_definition: Optional[DataDefinition] = None,
                                     config_ts_creation: Optional[TsCreationConfig] = None,
                                     config_checkin: Optional[str] = None,
                                     file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier:
        """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.

        Parameters
        ----------
        raw_data_source: typing.Union[pandas.DataFrame, pathlib.Path, builtins.str]
            A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
        config_fc: futureexpert.forecast.ReportConfig
            The configuration of the forecast run.
        data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
            Specifies the data, value and group columns and which rows and columns should be removed.
        config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
            Defines filter and aggreagtion level of the time series.
        config_checkin: typing.Optional[builtins.str]
            Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
            cannot be set simultaneously. The configuration may be obtained from the last step of
            CHECK-IN using the future frontend (now.future-forecasting.de).
        file_specification: futureexpert.checkin.FileSpecification
            Needed if a CSV is used with e.g. German format.

        Returns
        -------
        The identifier of the forecasting report.
        return: futureexpert.shared_models.ReportIdentifier

    """

        assert config_fc.rerun_report_id is None, 'start_forecast_from_raw_data can not be used with rerun_report_id.'

        upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification)

        user_input_id = upload_feedback['uuid']
        file_id = upload_feedback['files'][0]['uuid']

        res2 = self.create_time_series(user_input_id=user_input_id,
                                       file_uuid=file_id,
                                       data_definition=data_definition,
                                       config_ts_creation=config_ts_creation,
                                       config_checkin=config_checkin,
                                       file_specification=file_specification)

        version = res2['result']['tsVersion']
        return self.start_forecast(version=version, config=config_fc)

    @validate_call
    def get_fc_results(
        self,
        id: Union[ReportIdentifier, int],
        include_k_best_models: int = 1,
        include_backtesting: bool = False,
        include_discarded_models: bool = False
    ) -> ForecastResults:
        """Get forecast results.

        Parameters
        ----------
        id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
            Report identifier or report ID.
        include_k_best_models: builtins.int
            Number of best models to include.
        include_backtesting: builtins.bool
            Include backtesting results.
        include_discarded_models: builtins.bool
            Include discarded models.

        Returns
        -------
        ForecastResults with forecast data
        return: futureexpert.forecast.ForecastResults

    """
        report_id = id.report_id if isinstance(id, ReportIdentifier) else id
        report_status = self.get_report_status(id=id)
        has_results = self._can_load_results(report_status)

        if not has_results:
            return ForecastResults(forecast_results=[])

        params = {
            'include_k_best_models': include_k_best_models,
            'include_backtesting': include_backtesting,
            'include_discarded_models': include_discarded_models
        }
        result = self._request('GET', f'/api/v1/forecast/{report_id}/results', params=params)

        # Parse results
        forecast_results = [ForecastResult.model_validate(r) for r in result['forecast_results']]
        fc_results = ForecastResults(forecast_results=forecast_results)

        if result.get('consistency') is not None:
            fc_results.consistency = ConsistentForecastMetadata.model_validate(result['consistency'])

        return fc_results

    # ==================== Matcher ====================

    @validate_call
    def start_matcher(self, config: MatcherConfig) -> ReportIdentifier:
        """Start a covariate matcher report.

        Parameters
        ----------
        config: futureexpert.matcher.MatcherConfig
            Matcher configuration.

        Returns
        -------
        ReportIdentifier with report_id and settings_id
        return: futureexpert.shared_models.ReportIdentifier

    """
        payload = {'config': config.model_dump()}
        result = self._request('POST', '/api/v1/matcher', json_data=payload)
        report = ReportIdentifier.model_validate(result)
        logger.info(f'Report created with ID {report.report_id}. Matching indicators...')
        return report

    @validate_call
    def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> List[MatcherResult]:
        """Get matcher results.

        Parameters
        ----------
        id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
            Report identifier or report ID.

        Returns
        -------
        List of MatcherResult objects
        return: typing.list[futureexpert.matcher.MatcherResult]

    """
        report_id = id.report_id if isinstance(id, ReportIdentifier) else id

        report_status = self.get_report_status(id=id)
        has_results = self._can_load_results(report_status)

        if not has_results:
            return []

        result = self._request('GET', f'/api/v1/matcher/{report_id}/results')
        return [MatcherResult(**r) for r in result]

    # ==================== Associator ====================

    @validate_call
    def start_associator(self, config: AssociatorConfig) -> ReportIdentifier:
        """Start an associator report.

        Parameters
        ----------
        config: futureexpert.associator.AssociatorConfig
            Associator configuration.

        Returns
        -------
        ReportIdentifier with report_id and settings_id
        return: futureexpert.shared_models.ReportIdentifier

    """
        payload = {'config': config.model_dump()}
        result = self._request('POST', '/api/v1/associator', json_data=payload)
        report = ReportIdentifier.model_validate(result)
        logger.info(f'Report created with ID {report.report_id}. Associator is running...')
        return report

    @validate_call
    def get_associator_results(self, id: Union[ReportIdentifier, int]) -> Optional[AssociatorResult]:
        """Get associator results.

        Parameters
        ----------
        id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
            Report identifier or report ID.

        Returns
        -------
        Results of the ASSOCIATOR report.
        return: typing.Optional[futureexpert.associator.AssociatorResult]

    """
        report_id = id.report_id if isinstance(id, ReportIdentifier) else id

        report_status = self.get_report_status(id=id)
        has_results = self._can_load_results(report_status)

        if not has_results:
            return None

        result = self._request('GET', f'/api/v1/associator/{report_id}/results')
        return AssociatorResult(**result)

    # ==================== Reports ====================

    @validate_call
    def get_reports(self, skip: int = 0, limit: int = 100) -> PydanticModelList[ReportSummary]:
        """Get list of available reports.

        Parameters
        ----------
        skip: builtins.int
            Number of items to skip.
        limit: builtins.int
            Maximum number of items to return.

        Returns
        -------
        The available reports from newest to oldest.
        return: futureexpert.shared_models.PydanticModelList[futureexpert.shared_models.ReportSummary]

    """
        params = {'skip': skip, 'limit': limit}
        result = self._request('GET', '/api/v1/report', params=params)
        return PydanticModelList([ReportSummary.model_validate(report) for report in result])

    @validate_call
    def _get_single_report_status(self, report_identifier: ReportIdentifier, include_error_reason: bool = True) -> ReportStatus:
        """Gets the current status of a single report.

        Parameters
        ----------
        id
            Report identifier.
        include_error_reason: builtins.bool
            Determines whether log messages are to be included in the result.

        Returns
        -------
        The status of the report.
        report_identifier: futureexpert.shared_models.ReportIdentifier

    return: futureexpert.shared_models.ReportStatus

    """
        report_id = report_identifier.report_id

        cache_key = f'{report_id}_{include_error_reason}'
        if cache_key in self.report_status_cache:
            return self.report_status_cache[cache_key]

        # Determine endpoint based on report type
        report_type = self.get_report_type(report_identifier=report_id)

        # Use specific endpoint based on type
        params = {'include_error_reason': include_error_reason}
        if report_type in ['forecast', 'MongoForecastingResultSink', 'hierarchical-forecast']:
            raw_result = self._request('GET', f'/api/v1/forecast/{report_id}/status', params=params)
        elif report_type in ['matcher', 'CovariateSelection']:
            raw_result = self._request('GET', f'/api/v1/matcher/{report_id}/status', params=params)
        elif report_type == 'associator':
            raw_result = self._request('GET', f'/api/v1/associator/{report_id}/status', params=params)
        elif report_type == 'shaper':
            raw_result = self._request('GET', f'/api/v1/shaper/{report_id}/status', params=params)
        else:
            raise RuntimeError(f'Unsupported report type {report_type}')

        result = ReportStatus(**raw_result)
        if result.progress.requested == result.progress.finished and result.is_finished:
            self.report_status_cache[cache_key] = result

        return result

    @validate_call
    def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus:
        """Gets the current status of a report.

        If the provided report identifier includes prerequisites, the status of the prerequisites is included, too.

        Parameters
        ----------
        id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
            Report identifier or plain report ID.
        include_error_reason: builtins.bool
            Determines whether log messages are to be included in the result.

        Returns
        -------
        The status of the report.
        return: futureexpert.shared_models.ReportStatus

    """
        identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None)

        final_status = self._get_single_report_status(
            report_identifier=identifier, include_error_reason=include_error_reason)
        if isinstance(identifier, ChainedReportIdentifier):
            for prerequisite_identifier in identifier.prerequisites:
                prerequisite_status = self.get_report_status(id=prerequisite_identifier,
                                                             include_error_reason=include_error_reason)
                final_status.prerequisites.append(prerequisite_status)
        return final_status

    def _can_load_results(self, report_status: ReportStatus) -> bool:
        """Checks if results of an report can be returned and create log messages.

    Parameters
    ----------
    report_status: futureexpert.shared_models.ReportStatus

    return: builtins.bool

    """

        if report_status.progress.finished == 0:
            logger.warning('The report is not finished. No results to return.')
            return False

        if report_status.progress.finished != report_status.progress.requested:
            logger.warning('The report is not finished.')

        if report_status.result_type == 'matcher':
            if report_status.progress.finished < report_status.progress.requested and report_status.results.successful > 0:
                logger.warning('The report is not finished. Returning incomplete results.')
                return True
            if report_status.results.successful == 0:
                logger.warning('No results to return. Check `get_report_status` for details.')
                return False

        if report_status.result_type != 'matcher':
            if report_status.progress.finished < report_status.progress.requested \
                    and (report_status.results.successful > 0 or report_status.results.no_evaluation > 0):
                logger.warning('The report is not finished. Returning incomplete results.')
                return True
            if report_status.results.successful == 0 and report_status.results.no_evaluation == 0:
                logger.warning(
                    'Zero runs were successful. No results can be returned. Check `get_report_status` for details.')
                return False

        return True

    @validate_call
    def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) -> str:
        """Get report type.

        Parameters
        ----------
        report_identifier: typing.Union[builtins.int, futureexpert.shared_models.ReportIdentifier]
            Report ID or identifier.

        Returns
        -------
        Report type string
        return: builtins.str

    """
        report_id = report_identifier.report_id if isinstance(
            report_identifier, ReportIdentifier
        ) else report_identifier

        result = self._request('GET', f'/api/v1/report/{report_id}')
        return str(result['type'])

    @validate_call
    def start_making_forecast_consistent(
        self,
        config: MakeForecastConsistentConfiguration
    ) -> ReportIdentifier:
        """Start hierarchical forecast reconciliation process.

        Makes forecasts consistent across hierarchical levels.

        Parameters
        ----------
        config: futureexpert.forecast_consistency.MakeForecastConsistentConfiguration
            Configuration for the reconciliation process.

        Returns
        -------
        ReportIdentifier with report_id and settings_id
        return: futureexpert.shared_models.ReportIdentifier

    """
        payload: Dict[str, Any] = {
            'data_selection': config.data_selection.model_dump(),
            'report_note': config.report_note
        }

        if config.db_name:
            payload['db_name'] = config.db_name
        if config.reconciliation:
            payload['reconciliation'] = config.reconciliation.model_dump()

        logger.info('Started creating hierarchical reconciliation for consistent forecasts...')
        result = self._request('POST', '/api/v1/forecast/reconcile', json_data=payload)
        report = ReportIdentifier.model_validate(result)
        logger.info(f'Report created with ID {report.report_id}. Reconciliation is running...')
        return report

    @validate_call
    def create_scenario_values(self,
                               config: ScenarioValuesConfig) -> ShaperConfig:
        """Creates scenario values for covariates based on a time series and forecast horizon.

        Parameters
        ----------
        config: futureexpert.shaper.ScenarioValuesConfig
            Configuration for the creation of scenario values.

        Returns
        -------
        A list of Scenario objects containing high and low projections for each covariate.
        return: futureexpert.shaper.ShaperConfig

    """

        payload = {'config': config.model_dump(mode='json')}
        result = self._request('POST', '/api/v1/shaper/prepare', json_data=payload)
        return ShaperConfig(**result)

    @validate_call
    def start_scenario_forecast(self, config: ShaperConfig) -> ReportIdentifier:
        """Start forecast for scenarios.

        Parameters
        ----------
        config: futureexpert.shaper.ShaperConfig
            Configuration for a SHAPER run.
        return: futureexpert.shared_models.ReportIdentifier

    """
        ref_config = copy.deepcopy(config)
        for scenario in ref_config.scenarios:
            if isinstance(scenario.ts, Covariate):
                scenario.ts = CovariateRef(name=scenario.ts.ts.name, lag=scenario.ts.lag)

        payload = {'config': ref_config.model_dump(mode='json')}

        result = self._request('POST', '/api/v1/shaper', json_data=payload)
        report = ReportIdentifier.model_validate(result)
        logger.info(f'Report created with ID {report.report_id}. Shaping scenarios...')
        return report

    @staticmethod
    def from_user_password(dotenv_path: Optional[str] = None) -> ExpertClient:
        """Initialize ExpertClient from FUTURE_USER and FUTURE_PW in .env file or environment variables.

    Parameters
    ----------
    dotenv_path: typing.Optional[builtins.str]

    return: futureexpert.expert_client.ExpertClient

    """
        load_dotenv(dotenv_path=dotenv_path)
        environment = cast(Literal['production', 'staging', 'development'], os.getenv('FUTURE_ENVIRONMENT'))
        try:
            future_user = os.environ['FUTURE_USER']
        except KeyError:
            raise MissingCredentialsError('username') from None
        try:
            future_password = os.environ['FUTURE_PW']
        except KeyError:
            raise MissingCredentialsError('password') from None
        auth_client = FutureAuthClient(environment=environment)
        token = auth_client.token(future_user, future_password)
        return ExpertClient(refresh_token=token['refresh_token'], environment=environment)

    @validate_call
    def get_shaper_results(self, id: Union[ReportIdentifier, int]) -> Optional[ShaperResult]:
        """Gets the results from the given report.

        Parameters
        ----------
        id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
            Report identifier or plain report ID.

        Returns
        -------
        Results of the SHAPER report.
        return: typing.Optional[futureexpert.shaper.ShaperResult]

    """
        report_id = id.report_id if isinstance(id, ReportIdentifier) else id

        report_status = self.get_report_status(id=id)
        has_results = self._can_load_results(report_status)

        if not has_results:
            return None

        result = self._request('GET', f'/api/v1/shaper/{report_id}/results')
        return ShaperResult(**result)

    def logout(self) -> None:
        """Logout from futureEXPERT.

        If logged in with a refresh token. The refresh token is revoked.
        

    Parameters
    ----------
    return: builtins.NoneType

    """
        if (refresh_token := self._oauth_token.get('refresh_token')) is None:
            raise RuntimeError('Cannot logout without refresh_token')
        self.auth_client.logout(refresh_token)
        logger.info('Successfully logged out.')

Client for the FutureEXPERT REST API.

This client provides the same interface as futureexpert.ExpertClient but communicates with the expert-api REST API instead of directly with the backend.

It can be used as a drop-in replacement for ExpertClient when you want to use the REST API instead of the Python SDK.

Initialize the client from a token.

If you want to login using username and password, consider using ExpertClient.from_user_password.

Parameters

refresh_token

Authentication refresh token for Bearer authentication. If not provided, uses environment variable FUTURE_REFRESH_TOKEN.

You can retrieve a long-lived refresh token (offline token) in the user settings of the futureEXPERT Dashboard or using the Open ID Connect token endpoint of our identity provider.

Example for calling the token endpoint with scope offline_access: curl -s -X POST "https://future-auth.prognostica.de/realms/future/protocol/openid-connect/token" -H "Content-Type: application/x-www-form-urlencoded" –data-urlencode "client_id=expert" –data-urlencode "grant_type=password" –data-urlencode "scope=openid offline_access" –data-urlencode "username=$FUTURE_USER" –data-urlencode "password=$FUTURE_PW" | jq -r .refresh_token

access_token

Authentication access token for Bearer authentication.

If used instead of refresh_token, no automated token refresh is possible.

group
Optional group name for users in multiple groups. If not provided, uses environment variable FUTURE_GROUP.
environment
Optional environment (production, staging, development). If not provided, uses environment variable FUTURE_ENVIRONMENT.
timeout
Request timeout in seconds (default: 300)
max_retries
Maximum number of retries for failed requests (default: 3)

Static methods

def from_user_password(dotenv_path: Optional[str] = None) ‑> ExpertClient
Expand source code
@staticmethod
def from_user_password(dotenv_path: Optional[str] = None) -> ExpertClient:
    """Initialize ExpertClient from FUTURE_USER and FUTURE_PW in .env file or environment variables.

Parameters
----------
dotenv_path: typing.Optional[builtins.str]

return: futureexpert.expert_client.ExpertClient

"""
    load_dotenv(dotenv_path=dotenv_path)
    environment = cast(Literal['production', 'staging', 'development'], os.getenv('FUTURE_ENVIRONMENT'))
    try:
        future_user = os.environ['FUTURE_USER']
    except KeyError:
        raise MissingCredentialsError('username') from None
    try:
        future_password = os.environ['FUTURE_PW']
    except KeyError:
        raise MissingCredentialsError('password') from None
    auth_client = FutureAuthClient(environment=environment)
    token = auth_client.token(future_user, future_password)
    return ExpertClient(refresh_token=token['refresh_token'], environment=environment)

Initialize ExpertClient from FUTURE_USER and FUTURE_PW in .env file or environment variables.

Parameters

dotenv_path : typing.Optional[builtins.str]
 
return : ExpertClient
 

Instance variables

prop oauth2_client : OAuth2Client
Expand source code
@property
def oauth2_client(self) -> OAuth2Client:
    # Create httpx client with retry transport
    transport = httpx.HTTPTransport(retries=self.max_retries)

    return OAuth2Client(
        client_id=self.auth_client.auth_configuration.auth_client_id,
        token_endpoint=self.auth_client.openid_configuration.token_endpoint,
        token_endpoint_auth_method=self.auth_client.auth_configuration.token_endpoint_auth_method,
        token=self._oauth_token,
        update_token=self._update_token,
        leeway=30,
        base_url=self.api_url,
        timeout=self.timeout,
        transport=transport
    )

Methods

def check_data_definition(self,
user_input_id: str,
file_uuid: str,
data_definition: DataDefinition,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any
Expand source code
@validate_call
def check_data_definition(
    self,
    user_input_id: str,
    file_uuid: str,
    data_definition: DataDefinition,
    file_specification: FileSpecification = FileSpecification()
) -> Any:
    """Check data definition.

    Parameters
    ----------
    user_input_id: builtins.str
        UUID of the user input.
    file_uuid: builtins.str
        UUID of the file.
    data_definition: futureexpert.checkin.DataDefinition
        Data definition specification.
    file_specification: futureexpert.checkin.FileSpecification
        File specification for CSV parsing.

    Returns
    -------
    Validation result
    return: typing.Any

"""
    logger.info('Started data definition using CHECK-IN...')
    payload = {
        'user_input_id': user_input_id,
        'file_uuid': file_uuid,
        'data_definition': data_definition.model_dump(),
        'file_specification': file_specification.model_dump()
    }
    result = self._request('POST', '/api/v1/check-in/validate', json_data=payload)
    logger.info('Finished data definition.')
    return result

Check data definition.

Parameters

user_input_id : builtins.str
UUID of the user input.
file_uuid : builtins.str
UUID of the file.
data_definition : DataDefinition
Data definition specification.
file_specification : FileSpecification
File specification for CSV parsing.

Returns

Validation result
 
return : typing.Any
 
def check_in_pool_covs(self,
requested_pool_covs: List[PoolCovDefinition],
description: Optional[str] = None) ‑> CheckInPoolResult
Expand source code
@validate_call
def check_in_pool_covs(
    self,
    requested_pool_covs: List[PoolCovDefinition],
    description: Optional[str] = None
) -> CheckInPoolResult:
    """Create a new version from pool covariates.

    Parameters
    ----------
    requested_pool_covs: typing.list[futureexpert.pool.PoolCovDefinition]
        List of pool covariate definitions.
    description: typing.Optional[builtins.str]
        Short description of the selected covariates.

    Returns
    -------
    CheckInPoolResult with version_id and metadata
    return: futureexpert.pool.CheckInPoolResult

"""
    logger.info('Creating time series using checkin-pool...')
    payload = {
        'requested_pool_covs': [cov.model_dump() for cov in requested_pool_covs],
        'description': description
    }
    result = self._request('POST', '/api/v1/check-in/pool-covariate', json_data=payload)
    logger.info('Finished time series creation.')
    return CheckInPoolResult(**result)

Create a new version from pool covariates.

Parameters

requested_pool_covs : typing.list[PoolCovDefinition]
List of pool covariate definitions.
description : typing.Optional[builtins.str]
Short description of the selected covariates.

Returns

CheckInPoolResult with version_id and metadata
 
return : CheckInPoolResult
 
def check_in_time_series(self,
raw_data_source: Union[pd.DataFrame, Path, str],
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> str
Expand source code
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def check_in_time_series(
    self,
    raw_data_source: Union[pd.DataFrame, Path, str],
    data_definition: Optional[DataDefinition] = None,
    config_ts_creation: Optional[TsCreationConfig] = None,
    config_checkin: Optional[str] = None,
    file_specification: FileSpecification = FileSpecification()
) -> str:
    """Check in time series data.

    Only available in `Standard`, `Premium` and `Enterprise` subscription packages.

    Parameters
    ----------
    raw_data_source: typing.Union[pandas.DataFrame, pathlib.Path, builtins.str]
        DataFrame with raw data or path to CSV file.
    data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
        Data definition specification.
    config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
        Time series creation configuration.
    config_checkin: typing.Optional[builtins.str]
        Path to JSON config file (alternative to data_definition + config_ts_creation).
    file_specification: futureexpert.checkin.FileSpecification
        File specification for CSV parsing.

    Returns
    -------
    Version ID of the created time series
    return: builtins.str

"""
    form_data: Dict[str, Any] = {}
    files: Dict[str, Any] = {}

    if data_definition:
        form_data['data_definition'] = json.dumps(data_definition.model_dump())
    if config_ts_creation:
        form_data['config_ts_creation'] = json.dumps(config_ts_creation.model_dump())
    if config_checkin:
        files['config_checkin'] = open(config_checkin, 'rb')
    if file_specification:
        form_data['file_specification'] = json.dumps(file_specification.model_dump())

    try:

        if isinstance(raw_data_source, pd.DataFrame):

            with tempfile.TemporaryDirectory() as tmpdir:
                time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S')
                file_path = os.path.join(tmpdir, f'expert-{time_stamp}.csv')
                date_format = data_definition.date_column.format if data_definition else None
                raw_data_source.to_csv(path_or_buf=file_path, index=False, sep=file_specification.delimiter,
                                       decimal=file_specification.decimal, encoding='utf-8-sig',
                                       date_format=date_format)
                files['file'] = open(file_path, 'rb')
                result = self._request('POST', '/api/v1/check-in', files=files or None, data=form_data)

        else:
            files['file'] = open(raw_data_source, 'rb')
            result = self._request('POST', '/api/v1/check-in', files=files or None, data=form_data)
        return str(result['version_id'])
    finally:
        for f in files.values():
            f.close()

Check in time series data.

Only available in Standard, Premium and Enterprise subscription packages.

Parameters

raw_data_source : typing.Union[pandas.DataFrame, pathlib.Path, builtins.str]
DataFrame with raw data or path to CSV file.
data_definition : typing.Optional[DataDefinition]
Data definition specification.
config_ts_creation : typing.Optional[TsCreationConfig]
Time series creation configuration.
config_checkin : typing.Optional[builtins.str]
Path to JSON config file (alternative to data_definition + config_ts_creation).
file_specification : FileSpecification
File specification for CSV parsing.

Returns

Version ID of the created time series
 
return : builtins.str
 
def create_scenario_values(self, config: ScenarioValuesConfig) ‑> ShaperConfig
Expand source code
@validate_call
def create_scenario_values(self,
                           config: ScenarioValuesConfig) -> ShaperConfig:
    """Creates scenario values for covariates based on a time series and forecast horizon.

    Parameters
    ----------
    config: futureexpert.shaper.ScenarioValuesConfig
        Configuration for the creation of scenario values.

    Returns
    -------
    A list of Scenario objects containing high and low projections for each covariate.
    return: futureexpert.shaper.ShaperConfig

"""

    payload = {'config': config.model_dump(mode='json')}
    result = self._request('POST', '/api/v1/shaper/prepare', json_data=payload)
    return ShaperConfig(**result)

Creates scenario values for covariates based on a time series and forecast horizon.

Parameters

config : ScenarioValuesConfig
Configuration for the creation of scenario values.

Returns

A list of Scenario objects containing high and low projections for each covariate.
return : ShaperConfig
 
def create_time_series(self,
user_input_id: str,
file_uuid: str,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any
Expand source code
@validate_call
def create_time_series(
    self,
    user_input_id: str,
    file_uuid: str,
    data_definition: Optional[DataDefinition] = None,
    config_ts_creation: Optional[TsCreationConfig] = None,
    config_checkin: Optional[str] = None,
    file_specification: FileSpecification = FileSpecification()
) -> Any:
    """Create time series from already uploaded data.

    This is the second step of the check-in process, after upload_data.

    Parameters
    ----------
    user_input_id: builtins.str
        UUID of the user input (from upload_data response).
    file_uuid: builtins.str
        UUID of the file (from upload_data response).
    data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
        Data definition specification.
    config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
        Time series creation configuration.
    config_checkin: typing.Optional[builtins.str]
        Path to JSON config file (alternative to data_definition + config_ts_creation).
    file_specification: futureexpert.checkin.FileSpecification
        File specification for CSV parsing.

    Returns
    -------
    Time series creation result with version information
    return: typing.Any

"""
    logger.info('Creating time series using CHECK-IN...')
    form_data: Dict[str, Any] = {
        'user_input_id': user_input_id,
        'file_uuid': file_uuid,
    }

    if data_definition:
        form_data['data_definition'] = json.dumps(data_definition.model_dump())
    if config_ts_creation:
        form_data['config_ts_creation'] = json.dumps(config_ts_creation.model_dump())
    if file_specification:
        form_data['file_specification'] = json.dumps(file_specification.model_dump())

    files: Dict[str, Any] = {}
    if config_checkin:
        files['config_checkin'] = open(config_checkin, 'rb')

    try:
        result = self._request('POST', '/api/v1/check-in/create', files=files or None, data=form_data)
        logger.info('Finished time series creation.')
        return result
    finally:
        for f in files.values():
            f.close()

Create time series from already uploaded data.

This is the second step of the check-in process, after upload_data.

Parameters

user_input_id : builtins.str
UUID of the user input (from upload_data response).
file_uuid : builtins.str
UUID of the file (from upload_data response).
data_definition : typing.Optional[DataDefinition]
Data definition specification.
config_ts_creation : typing.Optional[TsCreationConfig]
Time series creation configuration.
config_checkin : typing.Optional[builtins.str]
Path to JSON config file (alternative to data_definition + config_ts_creation).
file_specification : FileSpecification
File specification for CSV parsing.

Returns

Time series creation result with version information
 
return : typing.Any
 
def get_associator_results(self, id: Union[ReportIdentifier, int]) ‑> AssociatorResult | None
Expand source code
@validate_call
def get_associator_results(self, id: Union[ReportIdentifier, int]) -> Optional[AssociatorResult]:
    """Get associator results.

    Parameters
    ----------
    id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
        Report identifier or report ID.

    Returns
    -------
    Results of the ASSOCIATOR report.
    return: typing.Optional[futureexpert.associator.AssociatorResult]

"""
    report_id = id.report_id if isinstance(id, ReportIdentifier) else id

    report_status = self.get_report_status(id=id)
    has_results = self._can_load_results(report_status)

    if not has_results:
        return None

    result = self._request('GET', f'/api/v1/associator/{report_id}/results')
    return AssociatorResult(**result)

Get associator results.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or report ID.

Returns

Results of the ASSOCIATOR report.
return : typing.Optional[AssociatorResult]
 
def get_data(self) ‑> Any
Expand source code
def get_data(self) -> Any:
    """Get available raw data.

    Returns
    -------
    Meta information of the data already uploaded.
    

Parameters
----------
return: typing.Any

"""
    return self._request('GET', '/api/v1/check-in/data')

Get available raw data.

Returns
-------
Meta information of the data already uploaded.

Parameters

return : typing.Any
 
def get_fc_results(self,
id: Union[ReportIdentifier, int],
include_k_best_models: int = 1,
include_backtesting: bool = False,
include_discarded_models: bool = False) ‑> ForecastResults
Expand source code
@validate_call
def get_fc_results(
    self,
    id: Union[ReportIdentifier, int],
    include_k_best_models: int = 1,
    include_backtesting: bool = False,
    include_discarded_models: bool = False
) -> ForecastResults:
    """Get forecast results.

    Parameters
    ----------
    id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
        Report identifier or report ID.
    include_k_best_models: builtins.int
        Number of best models to include.
    include_backtesting: builtins.bool
        Include backtesting results.
    include_discarded_models: builtins.bool
        Include discarded models.

    Returns
    -------
    ForecastResults with forecast data
    return: futureexpert.forecast.ForecastResults

"""
    report_id = id.report_id if isinstance(id, ReportIdentifier) else id
    report_status = self.get_report_status(id=id)
    has_results = self._can_load_results(report_status)

    if not has_results:
        return ForecastResults(forecast_results=[])

    params = {
        'include_k_best_models': include_k_best_models,
        'include_backtesting': include_backtesting,
        'include_discarded_models': include_discarded_models
    }
    result = self._request('GET', f'/api/v1/forecast/{report_id}/results', params=params)

    # Parse results
    forecast_results = [ForecastResult.model_validate(r) for r in result['forecast_results']]
    fc_results = ForecastResults(forecast_results=forecast_results)

    if result.get('consistency') is not None:
        fc_results.consistency = ConsistentForecastMetadata.model_validate(result['consistency'])

    return fc_results

Get forecast results.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or report ID.
include_k_best_models : builtins.int
Number of best models to include.
include_backtesting : builtins.bool
Include backtesting results.
include_discarded_models : builtins.bool
Include discarded models.

Returns

ForecastResults with futureexpert.forecast data
 
return : ForecastResults
 
def get_matcher_results(self, id: Union[ReportIdentifier, int]) ‑> List[MatcherResult]
Expand source code
@validate_call
def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> List[MatcherResult]:
    """Get matcher results.

    Parameters
    ----------
    id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
        Report identifier or report ID.

    Returns
    -------
    List of MatcherResult objects
    return: typing.list[futureexpert.matcher.MatcherResult]

"""
    report_id = id.report_id if isinstance(id, ReportIdentifier) else id

    report_status = self.get_report_status(id=id)
    has_results = self._can_load_results(report_status)

    if not has_results:
        return []

    result = self._request('GET', f'/api/v1/matcher/{report_id}/results')
    return [MatcherResult(**r) for r in result]

Get matcher results.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or report ID.

Returns

List of MatcherResult objects
 
return : typing.list[MatcherResult]
 
def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) ‑> PoolCovOverview
Expand source code
@validate_call
def get_pool_cov_overview(
    self,
    granularity: Optional[str] = None,
    search: Optional[str] = None
) -> PoolCovOverview:
    """Get overview of available pool covariates.

    Parameters
    ----------
    granularity: typing.Optional[builtins.str]
        Filter by granularity (Day or Month).
    search: typing.Optional[builtins.str]
        Full-text search query.

    Returns
    -------
    PoolCovOverview with available covariates
    return: futureexpert.pool.PoolCovOverview

"""
    params = {}
    if granularity:
        params['granularity'] = granularity
    if search:
        params['search'] = search

    result = self._request('GET', '/api/v1/pool', params=params)
    return PoolCovOverview(overview_json=result['overview_json'])

Get overview of available pool covariates.

Parameters

granularity : typing.Optional[builtins.str]
Filter by granularity (Day or Month).
search : typing.Optional[builtins.str]
Full-text search query.

Returns

PoolCovOverview with available covariates
 
return : PoolCovOverview
 
def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) ‑> ReportStatus
Expand source code
@validate_call
def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus:
    """Gets the current status of a report.

    If the provided report identifier includes prerequisites, the status of the prerequisites is included, too.

    Parameters
    ----------
    id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
        Report identifier or plain report ID.
    include_error_reason: builtins.bool
        Determines whether log messages are to be included in the result.

    Returns
    -------
    The status of the report.
    return: futureexpert.shared_models.ReportStatus

"""
    identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None)

    final_status = self._get_single_report_status(
        report_identifier=identifier, include_error_reason=include_error_reason)
    if isinstance(identifier, ChainedReportIdentifier):
        for prerequisite_identifier in identifier.prerequisites:
            prerequisite_status = self.get_report_status(id=prerequisite_identifier,
                                                         include_error_reason=include_error_reason)
            final_status.prerequisites.append(prerequisite_status)
    return final_status

Gets the current status of a report.

If the provided report identifier includes prerequisites, the status of the prerequisites is included, too.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or plain report ID.
include_error_reason : builtins.bool
Determines whether log messages are to be included in the result.

Returns

The status of the report.
return : ReportStatus
 
def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) ‑> str
Expand source code
@validate_call
def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) -> str:
    """Get report type.

    Parameters
    ----------
    report_identifier: typing.Union[builtins.int, futureexpert.shared_models.ReportIdentifier]
        Report ID or identifier.

    Returns
    -------
    Report type string
    return: builtins.str

"""
    report_id = report_identifier.report_id if isinstance(
        report_identifier, ReportIdentifier
    ) else report_identifier

    result = self._request('GET', f'/api/v1/report/{report_id}')
    return str(result['type'])

Get report type.

Parameters

report_identifier : typing.Union[builtins.int, ReportIdentifier]
Report ID or identifier.

Returns

Report type string
 
return : builtins.str
 
def get_reports(self, skip: int = 0, limit: int = 100) ‑> PydanticModelList[ReportSummary]
Expand source code
@validate_call
def get_reports(self, skip: int = 0, limit: int = 100) -> PydanticModelList[ReportSummary]:
    """Get list of available reports.

    Parameters
    ----------
    skip: builtins.int
        Number of items to skip.
    limit: builtins.int
        Maximum number of items to return.

    Returns
    -------
    The available reports from newest to oldest.
    return: futureexpert.shared_models.PydanticModelList[futureexpert.shared_models.ReportSummary]

"""
    params = {'skip': skip, 'limit': limit}
    result = self._request('GET', '/api/v1/report', params=params)
    return PydanticModelList([ReportSummary.model_validate(report) for report in result])

Get list of available reports.

Parameters

skip : builtins.int
Number of items to skip.
limit : builtins.int
Maximum number of items to return.

Returns

The available reports from newest to oldest.
return : PydanticModelList[ReportSummary]
 
def get_shaper_results(self, id: Union[ReportIdentifier, int]) ‑> ShaperResult | None
Expand source code
@validate_call
def get_shaper_results(self, id: Union[ReportIdentifier, int]) -> Optional[ShaperResult]:
    """Gets the results from the given report.

    Parameters
    ----------
    id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
        Report identifier or plain report ID.

    Returns
    -------
    Results of the SHAPER report.
    return: typing.Optional[futureexpert.shaper.ShaperResult]

"""
    report_id = id.report_id if isinstance(id, ReportIdentifier) else id

    report_status = self.get_report_status(id=id)
    has_results = self._can_load_results(report_status)

    if not has_results:
        return None

    result = self._request('GET', f'/api/v1/shaper/{report_id}/results')
    return ShaperResult(**result)

Gets the results from the given report.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or plain report ID.

Returns

Results of the SHAPER report.
return : typing.Optional[ShaperResult]
 
def get_time_series(self, version_id: str) ‑> CheckInResult
Expand source code
@validate_call
def get_time_series(self, version_id: str) -> CheckInResult:
    """Get time series data by version ID.

    Parameters
    ----------
    version_id: builtins.str
        Time series version ID.

    Returns
    -------
    CheckInResult with time series data
    return: futureexpert.checkin.CheckInResult

"""
    result = self._request('GET', f'/api/v1/ts/{version_id}')
    return CheckInResult(**result)

Get time series data by version ID.

Parameters

version_id : builtins.str
Time series version ID.

Returns

CheckInResult with time series data
 
return : CheckInResult
 
def get_ts_versions(self, skip: int = 0, limit: int = 100) ‑> PydanticModelList[TimeSeriesVersion]
Expand source code
@validate_call
def get_ts_versions(self, skip: int = 0, limit: int = 100) -> PydanticModelList[TimeSeriesVersion]:
    """Get list of time series versions.

    Parameters
    ----------
    skip: builtins.int
        Number of items to skip.
    limit: builtins.int
        Maximum number of items to return.

    Returns
    -------
    DataFrame with time series versions
    return: futureexpert.shared_models.PydanticModelList[futureexpert.checkin.TimeSeriesVersion]

"""
    params = {'skip': skip, 'limit': limit}
    results = self._request('GET', '/api/v1/ts', params=params)
    return PydanticModelList([TimeSeriesVersion.model_validate(raw_result) for raw_result in results])

Get list of time series versions.

Parameters

skip : builtins.int
Number of items to skip.
limit : builtins.int
Maximum number of items to return.

Returns

DataFrame with time series versions
 
return : PydanticModelList[TimeSeriesVersion]
 
def logout(self) ‑> None
Expand source code
def logout(self) -> None:
    """Logout from futureEXPERT.

    If logged in with a refresh token. The refresh token is revoked.
    

Parameters
----------
return: builtins.NoneType

"""
    if (refresh_token := self._oauth_token.get('refresh_token')) is None:
        raise RuntimeError('Cannot logout without refresh_token')
    self.auth_client.logout(refresh_token)
    logger.info('Successfully logged out.')

Logout from futureEXPERT.

If logged in with a refresh token. The refresh token is revoked.

Parameters

return : builtins.NoneType
 
def start_associator(self,
config: AssociatorConfig) ‑> ReportIdentifier
Expand source code
@validate_call
def start_associator(self, config: AssociatorConfig) -> ReportIdentifier:
    """Start an associator report.

    Parameters
    ----------
    config: futureexpert.associator.AssociatorConfig
        Associator configuration.

    Returns
    -------
    ReportIdentifier with report_id and settings_id
    return: futureexpert.shared_models.ReportIdentifier

"""
    payload = {'config': config.model_dump()}
    result = self._request('POST', '/api/v1/associator', json_data=payload)
    report = ReportIdentifier.model_validate(result)
    logger.info(f'Report created with ID {report.report_id}. Associator is running...')
    return report

Start an associator report.

Parameters

config : AssociatorConfig
Associator configuration.

Returns

ReportIdentifier with report_id and settings_id
 
return : ReportIdentifier
 
def start_forecast(self,
version: str,
config: ReportConfig,
reconciliation_config: Optional[ReconciliationConfig] = None) ‑> ReportIdentifier | ChainedReportIdentifier
Expand source code
@validate_call
def start_forecast(
    self,
    version: str,
    config: ReportConfig,
    reconciliation_config: Optional[ReconciliationConfig] = None
) -> Union[ReportIdentifier, ChainedReportIdentifier]:
    """Start a forecasting report.

    Parameters
    ----------
    version: builtins.str
        Time series version ID.
    config: futureexpert.forecast.ReportConfig
        Forecast configuration.
    reconciliation_config: futureexpert.forecast.ReportConfig
        Configuration to make forecasts consistent over hierarchical levels.

    Returns
    -------
    ReportIdentifier with report_id and settings_id.
    If reconciliation_config is provided, returns ChainedReportIdentifier
    with prerequisites containing the forecast report identifier.
    reconciliation_config: typing.Optional[futureexpert.forecast_consistency.ReconciliationConfig]

return: typing.Union[futureexpert.shared_models.ReportIdentifier, futureexpert.shared_models.ChainedReportIdentifier]

"""
    payload: Dict[str, Any] = {
        'version': version,
        'config': config.model_dump()
    }

    if reconciliation_config is not None:
        payload['reconciliation_config'] = reconciliation_config.model_dump()

    logger.info('Started creating FORECAST...')
    result = self._request('POST', '/api/v1/forecast', json_data=payload)

    identifier_model = ChainedReportIdentifier if 'prerequisites' in result else ReportIdentifier
    report_identifier = identifier_model.model_validate(result)
    logger.info(f'Report created with ID {report_identifier.report_id}. Forecasts are running...')
    return report_identifier

Start a forecasting report.

Parameters
----------
version: builtins.str
    Time series version ID.
config: futureexpert.forecast.ReportConfig
    Forecast configuration.
reconciliation_config: futureexpert.forecast.ReportConfig
    Configuration to make forecasts consistent over hierarchical levels.

Returns
-------
ReportIdentifier with report_id and settings_id.
If reconciliation_config is provided, returns ChainedReportIdentifier
with prerequisites containing the forecast report identifier.
reconciliation_config: typing.Optional[futureexpert.forecast_consistency.ReconciliationConfig]

return: typing.Union[futureexpert.shared_models.ReportIdentifier, futureexpert.shared_models.ChainedReportIdentifier]

def start_forecast_from_raw_data(self,
raw_data_source: Union[pd.DataFrame, Path, str],
config_fc: ReportConfig,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> ReportIdentifier
Expand source code
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def start_forecast_from_raw_data(self,
                                 raw_data_source: Union[pd.DataFrame, Path, str],
                                 config_fc: ReportConfig,
                                 data_definition: Optional[DataDefinition] = None,
                                 config_ts_creation: Optional[TsCreationConfig] = None,
                                 config_checkin: Optional[str] = None,
                                 file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier:
    """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.

    Parameters
    ----------
    raw_data_source: typing.Union[pandas.DataFrame, pathlib.Path, builtins.str]
        A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
    config_fc: futureexpert.forecast.ReportConfig
        The configuration of the forecast run.
    data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
        Specifies the data, value and group columns and which rows and columns should be removed.
    config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
        Defines filter and aggreagtion level of the time series.
    config_checkin: typing.Optional[builtins.str]
        Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
        cannot be set simultaneously. The configuration may be obtained from the last step of
        CHECK-IN using the future frontend (now.future-forecasting.de).
    file_specification: futureexpert.checkin.FileSpecification
        Needed if a CSV is used with e.g. German format.

    Returns
    -------
    The identifier of the forecasting report.
    return: futureexpert.shared_models.ReportIdentifier

"""

    assert config_fc.rerun_report_id is None, 'start_forecast_from_raw_data can not be used with rerun_report_id.'

    upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification)

    user_input_id = upload_feedback['uuid']
    file_id = upload_feedback['files'][0]['uuid']

    res2 = self.create_time_series(user_input_id=user_input_id,
                                   file_uuid=file_id,
                                   data_definition=data_definition,
                                   config_ts_creation=config_ts_creation,
                                   config_checkin=config_checkin,
                                   file_specification=file_specification)

    version = res2['result']['tsVersion']
    return self.start_forecast(version=version, config=config_fc)

Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.

Parameters

raw_data_source : typing.Union[pandas.DataFrame, pathlib.Path, builtins.str]
A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
config_fc : ReportConfig
The configuration of the forecast run.
data_definition : typing.Optional[DataDefinition]
Specifies the data, value and group columns and which rows and columns should be removed.
config_ts_creation : typing.Optional[TsCreationConfig]
Defines filter and aggreagtion level of the time series.
config_checkin : typing.Optional[builtins.str]
Path to the JSON file with the CHECK-IN configuration. config_ts_creation and config_checkin cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de).
file_specification : FileSpecification
Needed if a CSV is used with e.g. German format.

Returns

The identifier of the forecasting report.
return : ReportIdentifier
 
def start_making_forecast_consistent(self,
config: MakeForecastConsistentConfiguration) ‑> ReportIdentifier
Expand source code
@validate_call
def start_making_forecast_consistent(
    self,
    config: MakeForecastConsistentConfiguration
) -> ReportIdentifier:
    """Start hierarchical forecast reconciliation process.

    Makes forecasts consistent across hierarchical levels.

    Parameters
    ----------
    config: futureexpert.forecast_consistency.MakeForecastConsistentConfiguration
        Configuration for the reconciliation process.

    Returns
    -------
    ReportIdentifier with report_id and settings_id
    return: futureexpert.shared_models.ReportIdentifier

"""
    payload: Dict[str, Any] = {
        'data_selection': config.data_selection.model_dump(),
        'report_note': config.report_note
    }

    if config.db_name:
        payload['db_name'] = config.db_name
    if config.reconciliation:
        payload['reconciliation'] = config.reconciliation.model_dump()

    logger.info('Started creating hierarchical reconciliation for consistent forecasts...')
    result = self._request('POST', '/api/v1/forecast/reconcile', json_data=payload)
    report = ReportIdentifier.model_validate(result)
    logger.info(f'Report created with ID {report.report_id}. Reconciliation is running...')
    return report

Start hierarchical forecast reconciliation process.

Makes forecasts consistent across hierarchical levels.

Parameters

config : MakeForecastConsistentConfiguration
Configuration for the reconciliation process.

Returns

ReportIdentifier with report_id and settings_id
 
return : ReportIdentifier
 
def start_matcher(self,
config: MatcherConfig) ‑> ReportIdentifier
Expand source code
@validate_call
def start_matcher(self, config: MatcherConfig) -> ReportIdentifier:
    """Start a covariate matcher report.

    Parameters
    ----------
    config: futureexpert.matcher.MatcherConfig
        Matcher configuration.

    Returns
    -------
    ReportIdentifier with report_id and settings_id
    return: futureexpert.shared_models.ReportIdentifier

"""
    payload = {'config': config.model_dump()}
    result = self._request('POST', '/api/v1/matcher', json_data=payload)
    report = ReportIdentifier.model_validate(result)
    logger.info(f'Report created with ID {report.report_id}. Matching indicators...')
    return report

Start a covariate matcher report.

Parameters

config : MatcherConfig
Matcher configuration.

Returns

ReportIdentifier with report_id and settings_id
 
return : ReportIdentifier
 
def start_scenario_forecast(self, config: ShaperConfig) ‑> ReportIdentifier
Expand source code
@validate_call
def start_scenario_forecast(self, config: ShaperConfig) -> ReportIdentifier:
    """Start forecast for scenarios.

    Parameters
    ----------
    config: futureexpert.shaper.ShaperConfig
        Configuration for a SHAPER run.
    return: futureexpert.shared_models.ReportIdentifier

"""
    ref_config = copy.deepcopy(config)
    for scenario in ref_config.scenarios:
        if isinstance(scenario.ts, Covariate):
            scenario.ts = CovariateRef(name=scenario.ts.ts.name, lag=scenario.ts.lag)

    payload = {'config': ref_config.model_dump(mode='json')}

    result = self._request('POST', '/api/v1/shaper', json_data=payload)
    report = ReportIdentifier.model_validate(result)
    logger.info(f'Report created with ID {report.report_id}. Shaping scenarios...')
    return report

Start forecast for scenarios.

Parameters

config : ShaperConfig
Configuration for a SHAPER run.
return : ReportIdentifier
 
def upload_data(self,
source: Union[pd.DataFrame, str],
file_specification: Optional[FileSpecification] = None) ‑> Any
Expand source code
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def upload_data(
    self,
    source: Union[pd.DataFrame, str],
    file_specification: Optional[FileSpecification] = None
) -> Any:
    """Upload raw data for further processing.

    Parameters
    ----------
    source: typing.Union[pandas.DataFrame, builtins.str]
        Path to a CSV file or a pandas DataFrame.
    file_specification: typing.Optional[futureexpert.checkin.FileSpecification]
        File specification for CSV parsing.

    Returns
    -------
    Upload feedback with user_input_id and file_uuid
    return: typing.Any

"""
    if isinstance(source, pd.DataFrame):
        # Convert DataFrame to JSON for upload
        data_json = source.to_dict(orient='records')
        form_data = {
            'data': json.dumps(data_json)
        }
        if file_specification:
            form_data['file_specification'] = json.dumps(file_specification.model_dump())

        return self._request('POST', '/api/v1/check-in/data', data=form_data)
    else:
        # Upload file
        with open(source, 'rb') as f:
            files = {'file': f}
            data = {}
            if file_specification:
                data['file_specification'] = json.dumps(file_specification.model_dump())

            return self._request('POST', '/api/v1/check-in/data', files=files, data=data)

Upload raw data for further processing.

Parameters

source : typing.Union[pandas.DataFrame, builtins.str]
Path to a CSV file or a pandas DataFrame.
file_specification : typing.Optional[FileSpecification]
File specification for CSV parsing.

Returns

Upload feedback with user_input_id and file_uuid
 
return : typing.Any
 
class FileSpecification (**data: Any)
Expand source code
class FileSpecification(BaseConfig):
    """Specify the format of the CSV file.

    Parameters
    ----------
    delimiter: typing.Optional[builtins.str]
        The delimiter used to separate values.
    decimal: typing.Optional[builtins.str]
        The decimal character used in decimal numbers.
    thousands: typing.Optional[builtins.str]
        The thousands separator used in numbers.
    """
    delimiter: Optional[str] = ','
    decimal: Optional[str] = '.'
    thousands: Optional[str] = None

Specify the format of the CSV file.

Parameters

delimiter : typing.Optional[builtins.str]
The delimiter used to separate values.
decimal : typing.Optional[builtins.str]
The decimal character used in decimal numbers.
thousands : typing.Optional[builtins.str]
The thousands separator used in numbers.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var decimal : str | None
var delimiter : str | None
var model_config
var thousands : str | None
class FilterSettings (**data: Any)
Expand source code
class FilterSettings(BaseConfig):
    """Model for the filters.

    Parameters
    ----------
    type: typing.Literal['exclusion', 'inclusion']
        The type of filter: `exclusion` or `inclusion`.
    variable: builtins.str
        The columns name to be used for filtering.
    items: builtins.list[builtins.str]
        The list of values to be used for filtering.
    """
    type: Literal['exclusion', 'inclusion']
    variable: str
    items: list[str]

Model for the filters.

Parameters

type : typing.Literal['exclusion', 'inclusion']
The type of filter: exclusion or inclusion.
variable : builtins.str
The columns name to be used for filtering.
items : builtins.list[builtins.str]
The list of values to be used for filtering.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var items : list[str]
var model_config
var type : Literal['exclusion', 'inclusion']
var variable : str
class ForecastingConfig (**data: Any)
Expand source code
class ForecastingConfig(BaseConfig):
    """Forecasting configuration.

    Parameters
    ----------
    fc_horizon
        Forecast horizon.
    round_forecast_to_integer
        If true, then forecasts are rounded to the nearest integer (also applied during backtesting).
    use_ensemble
        If true, then calculate ensemble forecasts. Automatically makes a smart decision on which
        methods to use based on their backtesting performance.
    lower_bound
        Lower bound applied to the time series and forecasts.
    upper_bound
        Upper bound applied to the time series and forecasts.
    confidence_level
        Confidence level for prediction intervals.
    skip_empirical_prediction_intervals
        If true, empirical prediction intervals for confidence levels are not calculated.
        This does not affect models that generate their own prediction intervals.\n\n
        Disabling this can affect model selection,
        as plausibility checks on the intervals are also omitted.
        Setting this to `True` also removes the minimum forecast horizon needed for the intervals,
        allowing for a shorter `fc_horizon` during backtesting when defined via `step_weights`.
    working_day_adaptions
        If present, enables optional working day adaptions of the time series and forecasts.
        This is currently not compatible with use_ensemble=True.
    forecast_minimum_version
        Optional version ID of time series containing minimum forecast values.
        Forecast minimums must match time series via grouping columns, granularity.
        Dates must be within the forecasting horizon.
    extension_strategy
        Determines how to extend the forecast if the provided covariates do not cover the full forecast horizon.

        Options:
        * "switch":
          Uses the covariate-based model for as many steps as possible, then switches to the non-covariate model
          for the remaining fc steps.
          Note: This joins two independent forecasts, which may result in a visible discontinuity at the switch point.

        * "smooth":
          Uses the covariate-based predictions to generate the remaining fc steps using the non-covariate model.
          By treating the initial predictions as historical data for the second model, this strategy ensures
          a smoother transition between the two phases.
    """

    fc_horizon: Annotated[ValidatedPositiveInt, Field(ge=1, le=60)]
    round_forecast_to_integer: bool = False
    use_ensemble: bool = False
    lower_bound: Union[float, None] = None
    upper_bound: Union[float, None] = None
    confidence_level: float = 0.75
    skip_empirical_prediction_intervals: bool = False
    working_day_adaptions: Optional[WorkingDayAdaptionsConfig] = None
    forecast_minimum_version: Optional[str] = None
    extension_strategy: Literal['smooth', 'switch'] = 'smooth'

    @property
    def numeric_bounds(self) -> tuple[float, float]:
        return (
            self.lower_bound if self.lower_bound is not None else -np.inf,
            self.upper_bound if self.upper_bound is not None else np.inf,
        )

    @model_validator(mode='after')
    def ensemble_incompatible_with_working_days(self) -> Self:
        """Validator for combination of ensemble model and working day adaptions.

    Parameters
    ----------
    return: typing.Self

    """
        if self.use_ensemble and self.working_day_adaptions is not None:
            raise ValueError('use_ensemble and working_days cannot be used together.')
        return self

Forecasting configuration.

Parameters

fc_horizon
Forecast horizon.
round_forecast_to_integer
If true, then forecasts are rounded to the nearest integer (also applied during backtesting).
use_ensemble
If true, then calculate ensemble forecasts. Automatically makes a smart decision on which methods to use based on their backtesting performance.
lower_bound
Lower bound applied to the time series and forecasts.
upper_bound
Upper bound applied to the time series and forecasts.
confidence_level
Confidence level for prediction intervals.
skip_empirical_prediction_intervals

If true, empirical prediction intervals for confidence levels are not calculated. This does not affect models that generate their own prediction intervals.

Disabling this can affect model selection, as plausibility checks on the intervals are also omitted. Setting this to True also removes the minimum forecast horizon needed for the intervals, allowing for a shorter fc_horizon during backtesting when defined via step_weights.

working_day_adaptions
If present, enables optional working day adaptions of the time series and forecasts. This is currently not compatible with use_ensemble=True.
forecast_minimum_version
Optional version ID of time series containing minimum forecast values. Forecast minimums must match time series via grouping columns, granularity. Dates must be within the forecasting horizon.
extension_strategy

Determines how to extend the forecast if the provided covariates do not cover the full forecast horizon.

Options: * "switch": Uses the covariate-based model for as many steps as possible, then switches to the non-covariate model for the remaining fc steps. Note: This joins two independent forecasts, which may result in a visible discontinuity at the switch point.

  • "smooth": Uses the covariate-based predictions to generate the remaining fc steps using the non-covariate model. By treating the initial predictions as historical data for the second model, this strategy ensures a smoother transition between the two phases.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var confidence_level : float
var extension_strategy : Literal['smooth', 'switch']
var fc_horizonPositiveInt
var forecast_minimum_version : str | None
var lower_bound : float | None
var model_config
var round_forecast_to_integer : bool
var skip_empirical_prediction_intervals : bool
var upper_bound : float | None
var use_ensemble : bool
var working_day_adaptionsWorkingDayAdaptionsConfig | None

Instance variables

prop numeric_bounds : tuple[float, float]
Expand source code
@property
def numeric_bounds(self) -> tuple[float, float]:
    return (
        self.lower_bound if self.lower_bound is not None else -np.inf,
        self.upper_bound if self.upper_bound is not None else np.inf,
    )

Methods

def ensemble_incompatible_with_working_days(self) ‑> Self
Expand source code
@model_validator(mode='after')
def ensemble_incompatible_with_working_days(self) -> Self:
    """Validator for combination of ensemble model and working day adaptions.

Parameters
----------
return: typing.Self

"""
    if self.use_ensemble and self.working_day_adaptions is not None:
        raise ValueError('use_ensemble and working_days cannot be used together.')
    return self

Validator for combination of ensemble model and working day adaptions.

Parameters

return : typing.Self
 
class LagSelectionConfig (**data: Any)
Expand source code
class LagSelectionConfig(BaseModel):
    """Configures covariate lag selection.

    Parameters
    ----------
    fixed_lags: typing.Optional[builtins.list[builtins.int]]
        Lags that are tested in the lag selection.
    min_lag: typing.Optional[builtins.int]
        Minimal lag that is tested in the lag selection. For example, a lag 3 means the covariate
        is shifted 3 data points into the future.
    max_lag: typing.Optional[builtins.int]
        Maximal lag that is tested in the lag selection. For example, a lag 12 means the covariate
        is shifted 12 data points into the future.
    """
    min_lag: Optional[int] = None
    max_lag: Optional[int] = None
    fixed_lags: Optional[list[int]] = None

    @model_validator(mode='after')
    def _check_range(self) -> Self:
        if (self.min_lag is None) ^ (self.max_lag is None):
            raise ValueError(
                'If one of `min_lag` and `max_lag` is set the other one also needs to be set.')

        if self.min_lag and self.max_lag:
            if self.fixed_lags is not None:
                raise ValueError('Fixed lags and min/max lag are mutually exclusive.')
            if self.max_lag < self.min_lag:
                raise ValueError('max_lag needs to be greater or equal to min_lag.')
            lag_range = abs(self.max_lag - self.min_lag) + 1
            if lag_range > 15:
                raise ValueError(f'Only 15 lags are allowed to be tested. The requested range has length {lag_range}.')

        if self.fixed_lags and len(self.fixed_lags) > 15:
            raise ValueError(
                f'Only 15 lags are allowed to be tested. The provided fixed lags has length {len(self.fixed_lags)}.')

        return self

Configures covariate lag selection.

Parameters

fixed_lags : typing.Optional[builtins.list[builtins.int]]
Lags that are tested in the lag selection.
min_lag : typing.Optional[builtins.int]
Minimal lag that is tested in the lag selection. For example, a lag 3 means the covariate is shifted 3 data points into the future.
max_lag : typing.Optional[builtins.int]
Maximal lag that is tested in the lag selection. For example, a lag 12 means the covariate is shifted 12 data points into the future.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var fixed_lags : list[int] | None
var max_lag : int | None
var min_lag : int | None
var model_config
class MakeForecastConsistentConfiguration (**data: Any)
Expand source code
class MakeForecastConsistentConfiguration(BaseConfig):
    """Service configuration.

    Parameters
    ----------
    data_selection: futureexpert.forecast_consistency.MakeForecastConsistentDataSelection
        Configuration on the selection of time series and forecasts used for carrying out the reconciliation.
    report_note: builtins.str
        Note of the report.
    db_name: typing.Optional[builtins.str]
        Only accessible for internal use. Name of the database to use for storing the results.
    reconciliation: typing.Optional[futureexpert.forecast_consistency.ReconciliationConfig]
        Optional reconciliation configuration. If not provided, defaults will be used.
    """
    data_selection: MakeForecastConsistentDataSelection
    report_note: str
    db_name: Optional[str] = None
    reconciliation: Optional[ReconciliationConfig] = None

Service configuration.

Parameters

data_selection : MakeForecastConsistentDataSelection
Configuration on the selection of time series and forecasts used for carrying out the reconciliation.
report_note : builtins.str
Note of the report.
db_name : typing.Optional[builtins.str]
Only accessible for internal use. Name of the database to use for storing the results.
reconciliation : typing.Optional[ReconciliationConfig]
Optional reconciliation configuration. If not provided, defaults will be used.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var data_selectionMakeForecastConsistentDataSelection
var db_name : str | None
var model_config
var reconciliationReconciliationConfig | None
var report_note : str
class MakeForecastConsistentDataSelection (**data: Any)
Expand source code
class MakeForecastConsistentDataSelection(BaseConfig):
    """Forecast and time series selection for making forecast consistent.

    Parameters
    ----------
    version: builtins.str
        Time series version to be used.
    fc_report_id: builtins.int
        The identifier of the forecasting report to be used.
    forecast_minimum_version: builtins.str
        Optional version ID of time series containing minimum forecast values.
        Forecast minimums must match time series via grouping columns, granularity.
        Dates must be within the forecasting horizon.
    forecast_minimum_version: typing.Optional[builtins.str]

    """
    version: str
    fc_report_id: int
    forecast_minimum_version: Optional[str] = None

Forecast and time series selection for making forecast consistent.

Parameters

version : builtins.str
Time series version to be used.
fc_report_id : builtins.int
The identifier of the forecasting report to be used.
forecast_minimum_version : builtins.str
Optional version ID of time series containing minimum forecast values. Forecast minimums must match time series via grouping columns, granularity. Dates must be within the forecasting horizon.
forecast_minimum_version : typing.Optional[builtins.str]
 

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var fc_report_id : int
var forecast_minimum_version : str | None
var model_config
var version : str
class MatcherConfig (**data: Any)
Expand source code
class MatcherConfig(BaseConfig):
    """Configuration for a MATCHER run.

    Parameters
    ----------
    title: builtins.str
        A short description of the report.
    actuals_version: builtins.str
        The version ID of the actuals.
    covs_versions: builtins.list[builtins.str]
        List of versions of the covariates.
    actuals_filter: builtins.dict[builtins.str, typing.Any]
        Filter criterion for actuals time series. The given actuals version is
        automatically added as additional filter criterion. Possible Filter criteria are all fields that are part
        of the TimeSeries class. e.g. {'name': 'Sales'}
        For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
    covs_filter: builtins.dict[builtins.str, typing.Any]
        Filter criterion for covariates time series. The given covariate version is
        automatically added as additional filter criterion. Possible Filter criteria are all fields that are part
        of the TimeSeries class. e.g. {'name': 'Sales'}
        For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
    max_ts_len: typing.Optional[builtins.int]
        At most this number of most recent observations of the actuals time series is used. Check the variable MAX_TS_LEN_CONFIG
        for allowed configuration.
    lag_selection: futureexpert.matcher.LagSelectionConfig
        Configuration of covariate lag selection.
    evaluation_start_date: typing.Optional[builtins.str]
        Optional start date for the evaluation. The input should be in the ISO format
        with date and time, 'YYYY-mm-DDTHH-MM-SS', e.g., '2024-01-01T16:40:00'.
        Actuals and covariate observations prior to this start date are dropped.
    evaluation_end_date: typing.Optional[builtins.str]
        Optional end date for the evaluation. The input should be in the ISO format
        with date and time, 'YYYY-mm-DDTHH-MM-SS', e.g., '2024-01-01T16:40:00'.
        Actuals and covariate observations after this end date are dropped.
    max_publication_lag: builtins.int
        Maximal publication lag for the covariates. The publication lag of a covariate
        is the number of most recent observations (compared to the actuals) that are
        missing for the covariate. E.g., if the actuals (for monthly granularity) end
        in April 2023 but the covariate ends in February 2023, the covariate has a
        publication lag of 2.
    associator_report_id: typing.Optional[builtins.int]
        Optional report id of clustering results. If None, the database is searched for a fitting clustering.
        The clustering results are used in the post-selection. If there are too many selected behind this is
        that they all would give similar results in forecasting. Only used if `use_clustering_results`
        is true.
    use_clustering_results: builtins.bool
        If true clustering results are used.
    post_selection_queries: builtins.list[builtins.str]
        List of queries that are executed on the ranking summary DataFrame. Only ranking entries that
        match the queries are kept. The query strings need to satisfy the pandas query syntax
        (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html). Here are the columns
        of the ranking summary DataFrame that you might want to filter on:

        Column Name          |      Data Type   |    Description
        -----------------------------------------------------------------------------------------------
        Lag                  |          Int64   |    Lag of the covariate.
        Rank                 |        float64   |    Rank of the model.
        BetterThanNoCov      |           bool   |    Indicates whether the model is better than the non-cov model.
    enable_leading_covariate_selection: builtins.bool
        When True, all covariates after the lag is applied that do not have at least one more
        datapoint beyond the the time period covered by actuals are removed from the candidate
        covariates passed to covariate selection.
    fixed_season_length: typing.Optional[builtins.int]
        An optional parameter specifying the length of a season in the dataset.
    pool_covs: typing.Optional[builtins.list[futureexpert.pool.PoolCovDefinition]]
        List of covariate definitions. Only available in `Standard`, `Premium` and `Enterprise` subscription packages.
    db_name: typing.Optional[builtins.str]
        Only accessible for internal use. Name of the database to use for storing the results.
    rerun_report_id: typing.Optional[builtins.int]
        ReportId from which failed runs should be recomputed.
        Ensure to use the same ts_version. Otherwise all time series get computed again.
    rerun_status: list[typing.Literal['Error', 'NoEvaluation']]
        Status of the runs that should be computed again. `Error` and/or `NoEvaluation`.
    """
    title: str
    actuals_version: str
    covs_versions: list[str] = Field(default_factory=list)
    actuals_filter: dict[str, Any] = Field(default_factory=dict)
    covs_filter: dict[str, Any] = Field(default_factory=dict)
    max_ts_len: Annotated[
        Optional[int], pydantic.Field(ge=1, le=1500)] = None
    lag_selection: LagSelectionConfig = LagSelectionConfig()
    evaluation_start_date: Optional[str] = None
    evaluation_end_date: Optional[str] = None
    max_publication_lag: int = 2
    associator_report_id: Optional[pydantic.PositiveInt] = None
    use_clustering_results: bool = False
    post_selection_queries: list[str] = []
    enable_leading_covariate_selection: bool = True
    fixed_season_length: Optional[int] = None
    pool_covs: Optional[list[PoolCovDefinition]] = None
    db_name: Optional[str] = None
    rerun_report_id: Optional[int] = None
    rerun_status: list[RerunStatus] = ['Error']

    @model_validator(mode='after')
    def _validate_post_selection_queries(self) -> Self:
        # Validate the post-selection queries.
        invalid_queries = []
        columns = {
            'Lag': 'int',
            'Rank': 'float',
            'BetterThanNoCov': 'bool'
        }
        # Create an empty DataFrame with the specified column names and data types
        validation_df = pd.DataFrame(columns=columns.keys()).astype(columns)
        for postselection_query in self.post_selection_queries:
            try:
                validation_df.query(postselection_query, )
            except Exception:
                invalid_queries.append(postselection_query)

        if len(invalid_queries):
            raise ValueError('The following post-selection queries are invalidly formatted: '
                             f'{", ".join(invalid_queries)}. ')

        return self

    @model_validator(mode='after')
    def _validate_rerun_report_id(self) -> Self:

        if self.rerun_report_id is not None and self.pool_covs is not None:
            raise ValueError('rerun_report_id can not be used with pool_covs. '
                             'Use the exact covs_version used in the rerun_report_id.')

        return self

Configuration for a MATCHER run.

Parameters

title : builtins.str
A short description of the report.
actuals_version : builtins.str
The version ID of the actuals.
covs_versions : builtins.list[builtins.str]
List of versions of the covariates.
actuals_filter : builtins.dict[builtins.str, typing.Any]
Filter criterion for actuals time series. The given actuals version is automatically added as additional filter criterion. Possible Filter criteria are all fields that are part of the TimeSeries class. e.g. {'name': 'Sales'} For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
covs_filter : builtins.dict[builtins.str, typing.Any]
Filter criterion for covariates time series. The given covariate version is automatically added as additional filter criterion. Possible Filter criteria are all fields that are part of the TimeSeries class. e.g. {'name': 'Sales'} For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
max_ts_len : typing.Optional[builtins.int]
At most this number of most recent observations of the actuals time series is used. Check the variable MAX_TS_LEN_CONFIG for allowed configuration.
lag_selection : LagSelectionConfig
Configuration of covariate lag selection.
evaluation_start_date : typing.Optional[builtins.str]
Optional start date for the evaluation. The input should be in the ISO format with date and time, 'YYYY-mm-DDTHH-MM-SS', e.g., '2024-01-01T16:40:00'. Actuals and covariate observations prior to this start date are dropped.
evaluation_end_date : typing.Optional[builtins.str]
Optional end date for the evaluation. The input should be in the ISO format with date and time, 'YYYY-mm-DDTHH-MM-SS', e.g., '2024-01-01T16:40:00'. Actuals and covariate observations after this end date are dropped.
max_publication_lag : builtins.int
Maximal publication lag for the covariates. The publication lag of a covariate is the number of most recent observations (compared to the actuals) that are missing for the covariate. E.g., if the actuals (for monthly granularity) end in April 2023 but the covariate ends in February 2023, the covariate has a publication lag of 2.
associator_report_id : typing.Optional[builtins.int]
Optional report id of clustering results. If None, the database is searched for a fitting clustering. The clustering results are used in the post-selection. If there are too many selected behind this is that they all would give similar results in forecasting. Only used if use_clustering_results is true.
use_clustering_results : builtins.bool
If true clustering results are used.
post_selection_queries : builtins.list[builtins.str]

List of queries that are executed on the ranking summary DataFrame. Only ranking entries that match the queries are kept. The query strings need to satisfy the pandas query syntax (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html). Here are the columns of the ranking summary DataFrame that you might want to filter on:

Column Name | Data Type | Description

Lag | Int64 | Lag of the covariate. Rank | float64 | Rank of the model. BetterThanNoCov | bool | Indicates whether the model is better than the non-cov model.

enable_leading_covariate_selection : builtins.bool
When True, all covariates after the lag is applied that do not have at least one more datapoint beyond the the time period covered by actuals are removed from the candidate covariates passed to covariate selection.
fixed_season_length : typing.Optional[builtins.int]
An optional parameter specifying the length of a season in the dataset.
pool_covs : typing.Optional[builtins.list[PoolCovDefinition]]
List of covariate definitions. Only available in Standard, Premium and Enterprise subscription packages.
db_name : typing.Optional[builtins.str]
Only accessible for internal use. Name of the database to use for storing the results.
rerun_report_id : typing.Optional[builtins.int]
ReportId from which failed runs should be recomputed. Ensure to use the same ts_version. Otherwise all time series get computed again.
rerun_status : list[typing.Literal['Error', 'NoEvaluation']]
Status of the runs that should be computed again. Error and/or NoEvaluation.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var actuals_filter : dict[str, typing.Any]
var actuals_version : str
var associator_report_id : int | None
var covs_filter : dict[str, typing.Any]
var covs_versions : list[str]
var db_name : str | None
var enable_leading_covariate_selection : bool
var evaluation_end_date : str | None
var evaluation_start_date : str | None
var fixed_season_length : int | None
var lag_selectionLagSelectionConfig
var max_publication_lag : int
var max_ts_len : int | None
var model_config
var pool_covs : list[PoolCovDefinition] | None
var post_selection_queries : list[str]
var rerun_report_id : int | None
var rerun_status : list[typing.Literal['Error', 'NoEvaluation']]
var title : str
var use_clustering_results : bool
class MethodSelectionConfig (**data: Any)
Expand source code
class MethodSelectionConfig(BaseConfig):
    """Method selection configuration.

    Parameters
    ----------
    number_iterations: futureexpert.shared_models.PositiveInt
        Number of backtesting iterations. At least 8 iterations are needed for empirical prediction intervals.
    shift_len: futureexpert.shared_models.PositiveInt
        Number of time points by which the test window is shifted between backtesting iterations.
    backtesting_strategy: typing.Literal['standard', 'equal_coverage']
        Selects the methodology for backtesting.
        - 'standard': A standard rolling forecast. The evaluation window with fixed length is shifted at each step.
        This strategy is controlled by `number_iterations` and `shift_len`.
        - 'equal_coverage': A balanced strategy that guarantees every data point within the `equal_coverage_size`
        is forecasted the same number of times. This strategy has specific requirements: It uses a `shift_len`
        of 1 and the number of iterations is calculated automatically based on the `equal_coverage_size`
        and forecast horizon, ignoring the `number_iterations` parameter.
    equal_coverage_size: typing.Optional[futureexpert.shared_models.PositiveInt]
        Number of recent data points to test when `backtesting_strategy` `equal_coverage` is active.
        If None or chosen length is too long, it tries most common season length of a time series granularity instead.
    refit: builtins.bool
        If true, then models are refitted for each backtesting iteration.
    default_error_metric: typing.Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape']
        Error metric applied to the backtesting error for non-sporadic time series.
    sporadic_error_metric: typing.Literal['pis', 'sapis', 'acr', 'mar', 'msr']
        Error metric applied to the backtesting errors for sporadic time series.
    additional_accuracy_measures: list[typing.Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape', 'pis', 'sapis', 'acr', 'mar', 'msr']]
        Additional accuracy measures for solely reporting purposes.
        Does not affect internal evaluation or model ranking.
    step_weights: typing.Optional[builtins.dict[futureexpert.shared_models.PositiveInt, builtins.float]]
        Mapping from forecast steps to weights associated to forecast errors for that forecasting step.
        - Purpose: Applied only on error-metrics for non-sporadic time series.
        - Weights: Only positive weights are allowed.
        If a forecast step is not included in the dictionary, it will be assigned a weight of zero.
        - Forecast Horizon: The highest key in this dictionary defines the forecast horizon
        for backtesting, if `skip_empirical_prediction_intervals` is set to `True`.
    additional_cov_method: typing.Optional[typing.Literal['AdaBoost', 'AutoArima', 'CART', 'CatBoost', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'LightGBM', 'LinearRegression', 'MLP', 'RandomForest', 'SVM', 'XGBoost']]
        Define up to one additional method that uses the defined covariates for creating forecasts. Will not be
        calculated if deemed unfit by the preselection. If the parameter forecasting_methods: typing.Sequence[typing.Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']]
        is defined, the additional cov method must appear in that list, too.

        Only available in `Standard`, `Premium` and `Enterprise` subscription packages.
    cov_combination: typing.Literal['single', 'joint']
        Create a forecast model for each individual covariate (single)
        or a model using all covariates together (joint).

        `single` is only available in `Standard`, `Premium` and `Enterprise` subscription packages.
    forecasting_methods: typing.Sequence[typing.Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']]
        Define specific forecasting methods to be tested for generating forecasts.
        Specifying fewer methods can significantly reduce the runtime of forecast creation.
        If not specified, all available forecasting methods will be used by default.
        Given methods are automatically preselected based on time series characteristics of your data.
        If none of the given methods fits your data, a fallback set of forecasting methods will be used instead.
    forecasting_methods_per_hierarchy_level: builtins.dict[builtins.int, builtins.list[builtins.str]]
        Mapping from hierarchy level to list of forecasting methods.
        Keys represent the hierarchy depth, where 0 denotes the global level.
        If specified, allows different methods to be used at different hierarchy levels.
        For hierarchy levels not explicitly configured here, forecasting_methods is used per default.
        This setting does not influence the fallback pipeline.
    phase_out_fc_methods: typing.Sequence[typing.Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']]
        List of methods that will be used to forecast phase-out time series.
        Phase-out detection must be enabled in preprocessing configuration to take effect.
    """

    number_iterations: Annotated[ValidatedPositiveInt, Field(ge=1, le=24)] = PositiveInt(12)
    shift_len: ValidatedPositiveInt = PositiveInt(1)
    refit: bool = False
    default_error_metric: Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape'] = 'mse'
    sporadic_error_metric: Literal['pis', 'sapis', 'acr', 'mar', 'msr'] = 'pis'
    additional_accuracy_measures: list[Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape', 'pis', 'sapis',
                                               'acr', 'mar', 'msr']] = Field(default_factory=list)
    step_weights: Optional[dict[ValidatedPositiveInt, PositiveFloat]] = None

    additional_cov_method: Optional[AdditionalCovMethod] = None
    cov_combination: Literal['single', 'joint'] = 'single'
    forecasting_methods: Sequence[ForecastingMethods] = Field(default_factory=list)
    forecasting_methods_per_hierarchy_level: dict[int, Annotated[list[str],
                                                                 Field(min_length=1)]] = Field(default_factory=dict)
    phase_out_fc_methods: Sequence[ForecastingMethods] = Field(default_factory=lambda: ['ZeroForecast'])

    backtesting_strategy: Literal['standard', 'equal_coverage'] = 'standard'
    equal_coverage_size: Optional[ValidatedPositiveInt] = None

    @model_validator(mode='after')
    def shift_length_valid_when_equal_coverage_active(self) -> Self:
        if (self.shift_len != 1 and self.backtesting_strategy == 'equal_coverage'):
            raise ValueError('Equal-Coverage-Backtesting-Strategy only allows a shift length of 1.')
        return self

    @model_validator(mode='after')
    def step_weights_not_empty(self) -> Self:
        if self.step_weights is not None and len(self.step_weights) == 0:
            raise ValueError('Empty dictionary for step_weights is not allowed.')
        return self

Method selection configuration.

Parameters

number_iterations : PositiveInt
Number of backtesting iterations. At least 8 iterations are needed for empirical prediction intervals.
shift_len : PositiveInt
Number of time points by which the test window is shifted between backtesting iterations.
backtesting_strategy : typing.Literal['standard', 'equal_coverage']
Selects the methodology for backtesting. - 'standard': A standard rolling forecast. The evaluation window with fixed length is shifted at each step. This strategy is controlled by number_iterations and shift_len. - 'equal_coverage': A balanced strategy that guarantees every data point within the equal_coverage_size is forecasted the same number of times. This strategy has specific requirements: It uses a shift_len of 1 and the number of iterations is calculated automatically based on the equal_coverage_size and forecast horizon, ignoring the number_iterations parameter.
equal_coverage_size : typing.Optional[PositiveInt]
Number of recent data points to test when backtesting_strategy equal_coverage is active. If None or chosen length is too long, it tries most common season length of a time series granularity instead.
refit : builtins.bool
If true, then models are refitted for each backtesting iteration.
default_error_metric : typing.Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape']
Error metric applied to the backtesting error for non-sporadic time series.
sporadic_error_metric : typing.Literal['pis', 'sapis', 'acr', 'mar', 'msr']
Error metric applied to the backtesting errors for sporadic time series.
additional_accuracy_measures : list[typing.Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape', 'pis', 'sapis', 'acr', 'mar', 'msr']]
Additional accuracy measures for solely reporting purposes. Does not affect internal evaluation or model ranking.
step_weights : typing.Optional[builtins.dict[PositiveInt, builtins.float]]
Mapping from forecast steps to weights associated to forecast errors for that forecasting step. - Purpose: Applied only on error-metrics for non-sporadic time series. - Weights: Only positive weights are allowed. If a forecast step is not included in the dictionary, it will be assigned a weight of zero. - Forecast Horizon: The highest key in this dictionary defines the forecast horizon for backtesting, if skip_empirical_prediction_intervals is set to True.
additional_cov_method : typing.Optional[typing.Literal['AdaBoost', 'AutoArima', 'CART', 'CatBoost', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'LightGBM', 'LinearRegression', 'MLP', 'RandomForest', 'SVM', 'XGBoost']]

Define up to one additional method that uses the defined covariates for creating forecasts. Will not be calculated if deemed unfit by the preselection. If the parameter forecasting_methods: typing.Sequence[typing.Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']] is defined, the additional cov method must appear in that list, too.

Only available in Standard, Premium and Enterprise subscription packages.

cov_combination : typing.Literal['single', 'joint']

Create a forecast model for each individual covariate (single) or a model using all covariates together (joint).

single is only available in Standard, Premium and Enterprise subscription packages.

forecasting_methods : typing.Sequence[typing.Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']]
Define specific forecasting methods to be tested for generating forecasts. Specifying fewer methods can significantly reduce the runtime of forecast creation. If not specified, all available forecasting methods will be used by default. Given methods are automatically preselected based on time series characteristics of your data. If none of the given methods fits your data, a fallback set of forecasting methods will be used instead.
forecasting_methods_per_hierarchy_level : builtins.dict[builtins.int, builtins.list[builtins.str]]
Mapping from hierarchy level to list of forecasting methods. Keys represent the hierarchy depth, where 0 denotes the global level. If specified, allows different methods to be used at different hierarchy levels. For hierarchy levels not explicitly configured here, forecasting_methods is used per default. This setting does not influence the fallback pipeline.
phase_out_fc_methods : typing.Sequence[typing.Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']]
List of methods that will be used to forecast phase-out time series. Phase-out detection must be enabled in preprocessing configuration to take effect.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var additional_accuracy_measures : list[typing.Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape', 'pis', 'sapis', 'acr', 'mar', 'msr']]
var additional_cov_method : Literal['AdaBoost', 'AutoArima', 'CART', 'CatBoost', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'LightGBM', 'LinearRegression', 'MLP', 'RandomForest', 'SVM', 'XGBoost'] | None
var backtesting_strategy : Literal['standard', 'equal_coverage']
var cov_combination : Literal['single', 'joint']
var default_error_metric : Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape']
var equal_coverage_sizePositiveInt | None
var forecasting_methods : Sequence[Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']]
var forecasting_methods_per_hierarchy_level : dict[int, list[str]]
var model_config
var number_iterationsPositiveInt
var phase_out_fc_methods : Sequence[Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']]
var refit : bool
var shift_lenPositiveInt
var sporadic_error_metric : Literal['pis', 'sapis', 'acr', 'mar', 'msr']
var step_weights : dict[PositiveInt, float] | None

Methods

def shift_length_valid_when_equal_coverage_active(self) ‑> Self
Expand source code
@model_validator(mode='after')
def shift_length_valid_when_equal_coverage_active(self) -> Self:
    if (self.shift_len != 1 and self.backtesting_strategy == 'equal_coverage'):
        raise ValueError('Equal-Coverage-Backtesting-Strategy only allows a shift length of 1.')
    return self
def step_weights_not_empty(self) ‑> Self
Expand source code
@model_validator(mode='after')
def step_weights_not_empty(self) -> Self:
    if self.step_weights is not None and len(self.step_weights) == 0:
        raise ValueError('Empty dictionary for step_weights is not allowed.')
    return self
class PreprocessingConfig (**data: Any)
Expand source code
class PreprocessingConfig(BaseConfig):
    """Preprocessing configuration.

    Parameters
    ----------
    remove_leading_zeros: builtins.bool
        If true, then leading zeros are removed from the time series before forecasting. Is only applied
        if the time series has at least 5 values, including missing values.
    use_season_detection: builtins.bool
        If true, then the season length is determined from the data.
    seasonalities_to_test: typing.Optional[builtins.list[typing.Union[builtins.list[futureexpert.shared_models.PositiveInt], futureexpert.shared_models.PositiveInt]]]
        Season lengths to be tested. If not defined, a suitable set for the given granularity is used.
        Season lengths can only be tested, if the number of observations is at least three times as
        long as the biggest season length. Note that 1 must be in the list if the non-seasonal case should
        be considered, too. Allows a combination of single granularities or combinations of granularities.
    fixed_seasonalities: typing.Optional[builtins.list[futureexpert.shared_models.PositiveInt]]
        Season lengths used without checking. Allowed only if `use_season_detection` is false.
    detect_outliers: builtins.bool
        If true, then identifies outliers in the data.
    replace_outliers: builtins.bool
        If true, then identified outliers are replaced.
    detect_changepoints: builtins.bool
        If true, then change points such as level shifts are identified.
    detect_quantization: builtins.bool
        If true, a quantization algorithm is applied to the time series. Recognizes quantizations in the historic
        time series data and, if one has been detected, applies it to the forecasts.
    phase_out_method: typing.Literal['OFF', 'TRAILING_ZEROS', 'AUTO_FEW_OBS']
        Choose which method will be used to detect Phase-Out in timeseries or turn it OFF.
        TRAILING_ZEROS method uses the number of trailing zeros to detect Phase-Out.
        AUTO_FEW_OBS method uses few-observation-changepoints at the end of the time series to detect Phase-Out.
        AUTO_FEW_OBS is only allowed if `detect_changepoints` is true.
    num_trailing_zeros_for_phase_out: futureexpert.shared_models.PositiveInt
        Number of trailing zeros in timeseries to detect Phase-Out with TRAILING_ZEROS method.
    recent_trend_num_observations: typing.Optional[futureexpert.shared_models.PositiveInt]
        Number of observations which are included in time span used for recent trend detection.
    recent_trend_num_seasons: typing.Optional[futureexpert.shared_models.PositiveInt]
        Number of seasons which are included in time span used for recent trend detection.
        If both recent_trend_num_seasons and recent_trend_num_observations are set, the longer time span is used.
    """

    remove_leading_zeros: bool = False
    use_season_detection: bool = True
    # empty lists and None are treated the same in apollon
    seasonalities_to_test: Optional[list[Union[list[ValidatedPositiveInt], ValidatedPositiveInt]]] = None
    fixed_seasonalities: Optional[list[ValidatedPositiveInt]] = None
    detect_outliers: bool = False
    replace_outliers: bool = False
    detect_changepoints: bool = False
    detect_quantization: bool = False
    phase_out_method: Literal['OFF', 'TRAILING_ZEROS', 'AUTO_FEW_OBS'] = 'OFF'
    num_trailing_zeros_for_phase_out: ValidatedPositiveInt = PositiveInt(5)
    recent_trend_num_observations: Optional[ValidatedPositiveInt] = PositiveInt(6)
    recent_trend_num_seasons: Optional[ValidatedPositiveInt] = PositiveInt(2)

    @model_validator(mode='after')
    def _has_no_fixed_seasonalities_if_uses_season_detection(self) -> Self:
        if self.use_season_detection and self.fixed_seasonalities:
            raise ValueError('If fixed seasonalities is enabled, then season detection must be off.')

        return self

    @model_validator(mode='after')
    def _has_detect_changepoints_if_phase_out_method_is_auto_few_obs(self) -> Self:
        if not self.detect_changepoints and self.phase_out_method == 'AUTO_FEW_OBS':
            raise ValueError('If phase_out_method is set to AUTO_FEW_OBS, then detect_changepoints must be on.')

        return self

    @model_validator(mode='after')
    def _has_no_recent_trend_num_observation_nor_num_seasons(self) -> Self:
        if not self.recent_trend_num_observations and not self.recent_trend_num_seasons:
            raise ValueError(
                'Both recent_trend_num_observations and recent_trend_num_seasons cannot be None at the same time.')

        return self

Preprocessing configuration.

Parameters

remove_leading_zeros : builtins.bool
If true, then leading zeros are removed from the time series before forecasting. Is only applied if the time series has at least 5 values, including missing values.
use_season_detection : builtins.bool
If true, then the season length is determined from the data.
seasonalities_to_test : typing.Optional[builtins.list[typing.Union[builtins.list[PositiveInt], PositiveInt]]]
Season lengths to be tested. If not defined, a suitable set for the given granularity is used. Season lengths can only be tested, if the number of observations is at least three times as long as the biggest season length. Note that 1 must be in the list if the non-seasonal case should be considered, too. Allows a combination of single granularities or combinations of granularities.
fixed_seasonalities : typing.Optional[builtins.list[PositiveInt]]
Season lengths used without checking. Allowed only if use_season_detection is false.
detect_outliers : builtins.bool
If true, then identifies outliers in the data.
replace_outliers : builtins.bool
If true, then identified outliers are replaced.
detect_changepoints : builtins.bool
If true, then change points such as level shifts are identified.
detect_quantization : builtins.bool
If true, a quantization algorithm is applied to the time series. Recognizes quantizations in the historic time series data and, if one has been detected, applies it to the forecasts.
phase_out_method : typing.Literal['OFF', 'TRAILING_ZEROS', 'AUTO_FEW_OBS']
Choose which method will be used to detect Phase-Out in timeseries or turn it OFF. TRAILING_ZEROS method uses the number of trailing zeros to detect Phase-Out. AUTO_FEW_OBS method uses few-observation-changepoints at the end of the time series to detect Phase-Out. AUTO_FEW_OBS is only allowed if detect_changepoints is true.
num_trailing_zeros_for_phase_out : PositiveInt
Number of trailing zeros in timeseries to detect Phase-Out with TRAILING_ZEROS method.
recent_trend_num_observations : typing.Optional[PositiveInt]
Number of observations which are included in time span used for recent trend detection.
recent_trend_num_seasons : typing.Optional[PositiveInt]
Number of seasons which are included in time span used for recent trend detection. If both recent_trend_num_seasons and recent_trend_num_observations are set, the longer time span is used.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var detect_changepoints : bool
var detect_outliers : bool
var detect_quantization : bool
var fixed_seasonalities : list[PositiveInt] | None
var model_config
var num_trailing_zeros_for_phase_outPositiveInt
var phase_out_method : Literal['OFF', 'TRAILING_ZEROS', 'AUTO_FEW_OBS']
var recent_trend_num_observationsPositiveInt | None
var recent_trend_num_seasonsPositiveInt | None
var remove_leading_zeros : bool
var replace_outliers : bool
var seasonalities_to_test : list[list[PositiveInt] | PositiveInt] | None
var use_season_detection : bool
class ReconciliationConfig (**data: Any)
Expand source code
class ReconciliationConfig(BaseConfig):
    """Configuration for hierarchical reconciliation process.

    Parameters
    ----------
    method: futureexpert._forecast_consistency_metadata.ReconciliationMethod
        Primary reconciliation method to use
    fallback_methods: builtins.list[futureexpert._forecast_consistency_metadata.ReconciliationMethod]
        List of fallback methods to try if primary method fails
    excluded_levels: builtins.list[builtins.str]
        Set of hierarchy levels to exclude from reconciliation
    actuals_period_length: typing.Optional[builtins.int]
        Number of last datapoints from actuals to use for proportion calculation (None = all)
    forecast_period_length: typing.Optional[builtins.int]
        Number of datapoints from forecasts to use for proportion calculation (None = all)
    round_forecast_to_integer: builtins.bool
        If True, apply integer rounding constraint after reconciliation to ensure all
        forecast values are integers while preserving total sum and hierarchical consistency
    round_forecast_to_package_size: builtins.bool
        If True, apply package size rounding constraint after reconciliation to ensure all
        forecast values are multiples of time series specific package sizes.
        Cannot be combined with round_forecast_to_integer.
    enforce_forecast_minimum_constraint: builtins.bool
        If True, enforce forecast minimums from open orders or contractual obligations.
        Only available if round_forecast_to_package_size is active.
    """
    method: ReconciliationMethod = ReconciliationMethod.BOTTOM_UP
    fallback_methods: list[ReconciliationMethod] = Field(default_factory=list)
    excluded_levels: list[str] = Field(default_factory=list)
    actuals_period_length: Optional[int] = None
    forecast_period_length: Optional[int] = None
    round_forecast_to_integer: bool = False
    round_forecast_to_package_size: bool = False
    enforce_forecast_minimum_constraint: bool = False

    @model_validator(mode='after')
    def check_package_size_and_integer_rounding_exclusivity(self) -> ReconciliationConfig:
        """Validates that package size rounding and integer rounding cannot be used together.

    Parameters
    ----------
    return: futureexpert.forecast_consistency.ReconciliationConfig

    """
        if self.round_forecast_to_package_size and self.round_forecast_to_integer:
            raise ValueError(
                'round_forecast_to_package_size and round_forecast_to_integer cannot both be True. '
                'Package size rounding takes precedence and already enforces integer values.'
            )
        return self

    @model_validator(mode='after')
    def check_package_size_dependency(self) -> ReconciliationConfig:
        """Validates that package size rounding is active if minimum constraints are enforced.

    Parameters
    ----------
    return: futureexpert.forecast_consistency.ReconciliationConfig

    """
        if self.enforce_forecast_minimum_constraint and not self.round_forecast_to_package_size:
            raise ValueError(
                'enforce_forecast_minimum_constraint can only be True '
                'if round_forecast_to_package_size is also True.'
            )
        return self

Configuration for hierarchical reconciliation process.

Parameters

method : futureexpert._forecast_consistency_metadata.ReconciliationMethod
Primary reconciliation method to use
fallback_methods : builtins.list[futureexpert._forecast_consistency_metadata.ReconciliationMethod]
List of fallback methods to try if primary method fails
excluded_levels : builtins.list[builtins.str]
Set of hierarchy levels to exclude from reconciliation
actuals_period_length : typing.Optional[builtins.int]
Number of last datapoints from actuals to use for proportion calculation (None = all)
forecast_period_length : typing.Optional[builtins.int]
Number of datapoints from forecasts to use for proportion calculation (None = all)
round_forecast_to_integer : builtins.bool
If True, apply integer rounding constraint after reconciliation to ensure all forecast values are integers while preserving total sum and hierarchical consistency
round_forecast_to_package_size : builtins.bool
If True, apply package size rounding constraint after reconciliation to ensure all forecast values are multiples of time series specific package sizes. Cannot be combined with round_forecast_to_integer.
enforce_forecast_minimum_constraint : builtins.bool
If True, enforce forecast minimums from open orders or contractual obligations. Only available if round_forecast_to_package_size is active.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var actuals_period_length : int | None
var enforce_forecast_minimum_constraint : bool
var excluded_levels : list[str]
var fallback_methods : list[futureexpert._forecast_consistency_metadata.ReconciliationMethod]
var forecast_period_length : int | None
var method : futureexpert._forecast_consistency_metadata.ReconciliationMethod
var model_config
var round_forecast_to_integer : bool
var round_forecast_to_package_size : bool

Methods

def check_package_size_and_integer_rounding_exclusivity(self) ‑> ReconciliationConfig
Expand source code
@model_validator(mode='after')
def check_package_size_and_integer_rounding_exclusivity(self) -> ReconciliationConfig:
    """Validates that package size rounding and integer rounding cannot be used together.

Parameters
----------
return: futureexpert.forecast_consistency.ReconciliationConfig

"""
    if self.round_forecast_to_package_size and self.round_forecast_to_integer:
        raise ValueError(
            'round_forecast_to_package_size and round_forecast_to_integer cannot both be True. '
            'Package size rounding takes precedence and already enforces integer values.'
        )
    return self

Validates that package size rounding and integer rounding cannot be used together.

Parameters

return : ReconciliationConfig
 
def check_package_size_dependency(self) ‑> ReconciliationConfig
Expand source code
@model_validator(mode='after')
def check_package_size_dependency(self) -> ReconciliationConfig:
    """Validates that package size rounding is active if minimum constraints are enforced.

Parameters
----------
return: futureexpert.forecast_consistency.ReconciliationConfig

"""
    if self.enforce_forecast_minimum_constraint and not self.round_forecast_to_package_size:
        raise ValueError(
            'enforce_forecast_minimum_constraint can only be True '
            'if round_forecast_to_package_size is also True.'
        )
    return self

Validates that package size rounding is active if minimum constraints are enforced.

Parameters

return : ReconciliationConfig
 
class ReconciliationMethod (*args, **kwds)
Expand source code
class ReconciliationMethod(str, Enum):
    """Reconciliation methods for hierarchical forecasting."""

    BOTTOM_UP = 'bottom_up'
    """Sums forecasts from the base level of the hierarchy up to the top.

    Uses `hierarchicalforecast.methods.BottomUp`.
    """

    TOP_DOWN_PROPORTION_AVERAGES = 'top_down_proportion_averages'
    """Disaggregates the top-level forecast based on historical average proportions.

    Uses `hierarchicalforecast.methods.TopDown(method='proportion_averages')`.
    """

    TOP_DOWN_FORECAST_PROPORTION = 'top_down_forecast_proportion'
    """Disaggregates the top-level forecast based on the proportions of the base forecasts for each forecast step.

    Uses `hierarchicalforecast.methods.TopDown(method='forecast_proportions')`.
    """

    TOP_DOWN_AVERAGE_FORECAST_PROPORTION = 'top_down_average_forecast_proportion'
    """Disaggregates the top-level forecast based on the average proportions of the base forecasts over the horizon."""

    MIN_TRACE_WLS_STRUCT = 'min_trace_wls_struct'
    """Weights are based on the number of aggregated base series (Structural).

    Uses `hierarchicalforecast.methods.MinTrace(method='wls_struct')`.
    """

Reconciliation methods for hierarchical forecasting.

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var BOTTOM_UP

Sums forecasts from the base level of the hierarchy up to the top.

Uses hierarchicalforecast.methods.BottomUp.

var MIN_TRACE_WLS_STRUCT

Weights are based on the number of aggregated base series (Structural).

Uses hierarchicalforecast.methods.MinTrace(method='wls_struct').

var TOP_DOWN_AVERAGE_FORECAST_PROPORTION

Disaggregates the top-level forecast based on the average proportions of the base forecasts over the horizon.

var TOP_DOWN_FORECAST_PROPORTION

Disaggregates the top-level forecast based on the proportions of the base forecasts for each forecast step.

Uses hierarchicalforecast.methods.TopDown(method='forecast_proportions').

var TOP_DOWN_PROPORTION_AVERAGES

Disaggregates the top-level forecast based on historical average proportions.

Uses hierarchicalforecast.methods.TopDown(method='proportion_averages').

class ReportConfig (**data: Any)
Expand source code
class ReportConfig(BaseConfig):
    """Forecast run configuration.

    Parameters
    ----------
    matcher_report_id: typing.Optional[builtins.int]
        Report ID of the covariate matcher.
    covs_versions: builtins.list[builtins.str]
        List of versions of the covariates.
    covs_configuration: typing.Optional[builtins.list[futureexpert.matcher.ActualsCovsConfiguration]]
        Mapping from actuals and covariates. Use for custom covariate or adjusted matcher results.
        If the matcher results should be used without changes use `matcher_report_id` instead.
    title: builtins.str
        Title of the report.
    actuals_filter: builtins.dict[builtins.str, typing.Any]
        Filter criterion for actuals time series. The given actuals version is
        automatically added as additional filter criterion. Possible Filter criteria are all fields that are part
        of the TimeSeries class. e.g. {'name': 'Sales'}
        For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
    max_ts_len: typing.Optional[builtins.int]
        At most this number of most recent observations is used. Check the variable MAX_TS_LEN_CONFIG
        for allowed configuration.
    preprocessing: futureexpert.forecast.PreprocessingConfig
        Preprocessing configuration.
    forecasting: futureexpert.forecast.ForecastingConfig
        Forecasting configuration.
    method_selection: typing.Optional[futureexpert.forecast.MethodSelectionConfig]
        Method selection configuration. If not supplied, then a granularity dependent default is used.
    pool_covs: typing.Optional[builtins.list[futureexpert.pool.PoolCovDefinition]]
        List of covariate definitions. Only available in `Standard`, `Premium` and `Enterprise` subscription packages.
    rerun_report_id: typing.Optional[builtins.int]
        ReportId from which failed runs should be recomputed.
        Ensure to use the same ts_version. Otherwise all time series get computed again.
    rerun_status: list[typing.Literal['Error', 'NoEvaluation']]
        Status of the runs that should be computed again. `Error` and/or `NoEvaluation`.
    db_name: typing.Optional[builtins.str]
        Only accessible for internal use. Name of the database to use for storing the results.
    priority: typing.Optional[builtins.int]
        Only accessible for internal use. Higher value indicate higher priority.
    """

    title: str
    forecasting: ForecastingConfig
    matcher_report_id: Optional[int] = None
    covs_versions: list[str] = Field(default_factory=list)
    covs_configuration: Optional[list[ActualsCovsConfiguration]] = None
    actuals_filter: dict[str, Any] = Field(default_factory=dict)
    max_ts_len: Optional[int] = None
    preprocessing: PreprocessingConfig = PreprocessingConfig()
    pool_covs: Optional[list[PoolCovDefinition]] = None
    method_selection: Optional[MethodSelectionConfig] = None
    rerun_report_id: Optional[int] = None
    rerun_status: list[RerunStatus] = ['Error']
    db_name:  Optional[str] = None
    priority: Annotated[Optional[int], Field(ge=0, le=10)] = None

    @model_validator(mode='after')
    def _correctness_of_cov_configurations(self) -> Self:
        if (self.matcher_report_id or self.covs_configuration) and (
                len(self.covs_versions) == 0 and self.pool_covs is None):
            raise ValueError(
                'If one of `matcher_report_id` and `covs_configuration` is set also `covs_versions` needs to be set.')
        if (self.matcher_report_id is None and self.covs_configuration is None) and (
                self.covs_versions or self.pool_covs):
            raise ValueError(
                'If `covs_versions` or `pool_covs` is set ' +
                'either `matcher_report_id` or `covs_configuration` needs to be set.')
        if self.covs_configuration is not None and len(self.covs_configuration) == 0:
            raise ValueError('`covs_configuration` has length zero and therefore won`t have any effect. '
                             'Please remove the parameter or set to None.')
        return self

    @model_validator(mode='after')
    def _only_one_covariate_definition(self) -> Self:
        fields = [
            'matcher_report_id',
            'pool_covs'
        ]

        set_fields = [field for field in fields if getattr(self, field) is not None]

        if len(set_fields) > 1:
            raise ValueError(f'Only one of {", ".join(fields)} can be set. Found: {", ".join(set_fields)}')

        return self

    @model_validator(mode='after')
    def _backtesting_step_weights_refer_to_valid_forecast_steps(self) -> Self:
        if (self.method_selection
            and self.method_selection.step_weights
                and max(self.method_selection.step_weights.keys()) > self.forecasting.fc_horizon):
            raise ValueError('Step weights must not refer to forecast steps beyond the fc_horizon.')

        return self

    @model_validator(mode='after')
    def _valid_covs_version(self) -> Self:
        for covs_version in self.covs_versions:
            if re.match('^[0-9a-f]{24}$', covs_version) is None:
                raise ValueError(f'Given covs_version "{covs_version}" is not a valid ObjectId.')
        return self

    @model_validator(mode='after')
    def _has_valid_phase_out_detection_method_if_phase_out_fc_method_was_changed(self) -> Self:
        if ((self.method_selection and self.method_selection.phase_out_fc_methods != ['ZeroForecast']) and
                self.preprocessing.phase_out_method == 'OFF'):
            # A warning is logged instead of raising an error since this does not cause downstream issues.
            # The user is informed that their changes to phase_out_fc_methods have no effect
            # to clarify the relationship between these settings.
            logger.warning('Phase-out detection must be enabled in PreprocessingConfig'
                           ' so changes in phase_out_fc_methods in MethodSelectionConfig take effect.')
        return self

    @model_validator(mode='after')
    def _has_non_empty_phase_out_fc_method_if_phase_out_detection_is_on(self) -> Self:
        if (self.method_selection and
                not self.method_selection.phase_out_fc_methods and
                self.preprocessing.phase_out_method != 'OFF'):
            raise ValueError('Phase out forecasting method cannot be empty when phase out detection is enabled.')

        return self

Forecast run configuration.

Parameters

matcher_report_id : typing.Optional[builtins.int]
Report ID of the covariate matcher.
covs_versions : builtins.list[builtins.str]
List of versions of the covariates.
covs_configuration : typing.Optional[builtins.list[ActualsCovsConfiguration]]
Mapping from actuals and covariates. Use for custom covariate or adjusted matcher results. If the matcher results should be used without changes use matcher_report_id instead.
title : builtins.str
Title of the report.
actuals_filter : builtins.dict[builtins.str, typing.Any]
Filter criterion for actuals time series. The given actuals version is automatically added as additional filter criterion. Possible Filter criteria are all fields that are part of the TimeSeries class. e.g. {'name': 'Sales'} For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
max_ts_len : typing.Optional[builtins.int]
At most this number of most recent observations is used. Check the variable MAX_TS_LEN_CONFIG for allowed configuration.
preprocessing : PreprocessingConfig
Preprocessing configuration.
forecasting : ForecastingConfig
Forecasting configuration.
method_selection : typing.Optional[MethodSelectionConfig]
Method selection configuration. If not supplied, then a granularity dependent default is used.
pool_covs : typing.Optional[builtins.list[PoolCovDefinition]]
List of covariate definitions. Only available in Standard, Premium and Enterprise subscription packages.
rerun_report_id : typing.Optional[builtins.int]
ReportId from which failed runs should be recomputed. Ensure to use the same ts_version. Otherwise all time series get computed again.
rerun_status : list[typing.Literal['Error', 'NoEvaluation']]
Status of the runs that should be computed again. Error and/or NoEvaluation.
db_name : typing.Optional[builtins.str]
Only accessible for internal use. Name of the database to use for storing the results.
priority : typing.Optional[builtins.int]
Only accessible for internal use. Higher value indicate higher priority.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var actuals_filter : dict[str, typing.Any]
var covs_configuration : list[ActualsCovsConfiguration] | None
var covs_versions : list[str]
var db_name : str | None
var forecastingForecastingConfig
var matcher_report_id : int | None
var max_ts_len : int | None
var method_selectionMethodSelectionConfig | None
var model_config
var pool_covs : list[PoolCovDefinition] | None
var preprocessingPreprocessingConfig
var priority : int | None
var rerun_report_id : int | None
var rerun_status : list[typing.Literal['Error', 'NoEvaluation']]
var title : str
class TrendDetectionConfiguration (**data: Any)
Expand source code
class TrendDetectionConfiguration(BaseConfig):
    """Configuration for trend detection.

    Parameters
    ----------
    end_time: typing.Optional[datetime.datetime]
        End (inclusive) of the time span used for trend detection.
    max_number_of_obs: builtins.int
        Width of the time span used for trend detection; (leading and trailing) missing values
        are disregarded, that is, at most this number of observations are used for a given time series.
    number_of_nans_tolerated: builtins.int
        Leading and lagging missing values are dropped prior to running the trend detection; if this
        results in a loss of more than this number of observations lost, then the trend is considered
        undetermined.
    """
    end_time: Optional[datetime] = None
    max_number_of_obs: int = Field(default=6, gt=0)
    number_of_nans_tolerated: int = Field(default=2, ge=0)

Configuration for trend detection.

Parameters

end_time : typing.Optional[datetime.datetime]
End (inclusive) of the time span used for trend detection.
max_number_of_obs : builtins.int
Width of the time span used for trend detection; (leading and trailing) missing values are disregarded, that is, at most this number of observations are used for a given time series.
number_of_nans_tolerated : builtins.int
Leading and lagging missing values are dropped prior to running the trend detection; if this results in a loss of more than this number of observations lost, then the trend is considered undetermined.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var end_time : datetime.datetime | None
var max_number_of_obs : int
var model_config
var number_of_nans_tolerated : int
class TsCreationConfig (**data: Any)
Expand source code
class TsCreationConfig(BaseConfig):
    """Configuration for the creation of time series.

    Parameters
    ----------
    value_columns_to_save: builtins.list[builtins.str]
        Value columns that should be saved.
    time_granularity: typing.Literal['yearly', 'quarterly', 'monthly', 'weekly', 'daily', 'hourly', 'halfhourly']
        Target granularity of the time series.
    description: typing.Optional[builtins.str]
        A short description of the time series.
    start_date: typing.Optional[builtins.str]
        Dates before this date are excluded.
    end_date: typing.Optional[builtins.str]
        Dates after this date are excluded.
    grouping_level: builtins.list[builtins.str]
        Names of group columns that should be used as the grouping level.
    save_hierarchy: builtins.bool
        If true, interpretes the given grouping levels as levels of a hierarchy and saves all hierachy levels.
        Otherwise, no hierarchy levels are implied and only the single level with the given grouping is saved.
        e.g. if grouping_level is ['A', 'B', 'C'] time series of grouping 'A', 'AB' and 'ABC' is saved.
        For later filtering use {'grouping.A': {'$exists': True}}
    filter: builtins.list[futureexpert.checkin.FilterSettings]
        Settings for including or excluding values during time series creation.
    new_variables: builtins.list[futureexpert.checkin.NewValue]
        New value column that is a combination of two other value columns.
    missing_value_handler: typing.Literal['keepNaN', 'setToZero']
        Strategy how to handle missing values during time series creation.
    """
    value_columns_to_save: list[str]
    time_granularity: Literal['yearly', 'quarterly', 'monthly', 'weekly', 'daily', 'hourly', 'halfhourly']
    description: Optional[str] = None
    grouping_level: list[str] = []
    start_date: Optional[str] = None
    end_date: Optional[str] = None
    save_hierarchy: bool = False
    filter: list[FilterSettings] = []
    new_variables: list[NewValue] = []
    missing_value_handler: Literal['keepNaN', 'setToZero'] = 'keepNaN'

Configuration for the creation of time series.

Parameters

value_columns_to_save : builtins.list[builtins.str]
Value columns that should be saved.
time_granularity : typing.Literal['yearly', 'quarterly', 'monthly', 'weekly', 'daily', 'hourly', 'halfhourly']
Target granularity of the time series.
description : typing.Optional[builtins.str]
A short description of the time series.
start_date : typing.Optional[builtins.str]
Dates before this date are excluded.
end_date : typing.Optional[builtins.str]
Dates after this date are excluded.
grouping_level : builtins.list[builtins.str]
Names of group columns that should be used as the grouping level.
save_hierarchy : builtins.bool
If true, interpretes the given grouping levels as levels of a hierarchy and saves all hierachy levels. Otherwise, no hierarchy levels are implied and only the single level with the given grouping is saved. e.g. if grouping_level is ['A', 'B', 'C'] time series of grouping 'A', 'AB' and 'ABC' is saved. For later filtering use {'grouping.A': {'$exists': True}}
filter : builtins.list[FilterSettings]
Settings for including or excluding values during time series creation.
new_variables : builtins.list[NewValue]
New value column that is a combination of two other value columns.
missing_value_handler : typing.Literal['keepNaN', 'setToZero']
Strategy how to handle missing values during time series creation.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var description : str | None
var end_date : str | None
var filter : list[FilterSettings]
var grouping_level : list[str]
var missing_value_handler : Literal['keepNaN', 'setToZero']
var model_config
var new_variables : list[NewValue]
var save_hierarchy : bool
var start_date : str | None
var time_granularity : Literal['yearly', 'quarterly', 'monthly', 'weekly', 'daily', 'hourly', 'halfhourly']
var value_columns_to_save : list[str]