Package futureexpert

Sub-modules

futureexpert.checkin

Contains the models with the configuration for CHECK-IN.

futureexpert.expert_client

Client for connecting with future.

futureexpert.forecast

Contains the models with the configuration for the forecast and the result format.

futureexpert.matcher

Contains the models with the configuration for the matcher and the result format.

futureexpert.plot

Contains all the functionality to plot the checked in time series and the forecast and backtesting results.

futureexpert.pool
futureexpert.shared_models

Shared models used across multiple modules.

Classes

class ActualsCovsConfiguration (**data: Any)
Expand source code
class ActualsCovsConfiguration(BaseModel):
    """Configuration of actuals and covariates via name and lag.

    Parameters
    ----------
    actuals_name: builtins.str
        Name of the time series.
    covs_configurations: builtins.list
        List of Covariates.
    """
    actuals_name: str
    covs_configurations: list[CovariateRef]

Configuration of actuals and covariates via name and lag.

Parameters

actuals_name : builtins.str
Name of the time series.
covs_configurations : builtins.list
List of Covariates.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var actuals_name : str
var covs_configurations : list[CovariateRef]
var model_config
class CovariateRef (**data: Any)
Expand source code
class CovariateRef(BaseModel):
    """Covariate reference.

    Parameters
    ----------
    name: builtins.str
        Name of the Covariate
    lag: builtins.int
        Lag by which the covariate was used.
    """
    name: str
    lag: int

Covariate reference.

Parameters

name : builtins.str
Name of the Covariate
lag : builtins.int
Lag by which the covariate was used.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var lag : int
var model_config
var name : str
class DataDefinition (**data: Any)
Expand source code
class DataDefinition(BaseConfig):
    """Model for the input parameter needed for the first CHECK-IN step.

    Parameters
    ----------
    remove_rows: typing.Optional
        Indexes of the rows to be removed before validation. Note: If the raw data was committed as pandas data frame
        the header is the first row (row index 0).
    remove_columns: typing.Optional
        Indexes of the columns to be removed before validation.
    date_columns: futureexpert.checkin.DateColumn
        Definition of the date column.
    value_columns: builtins.list
        Definitions of the value columns.
    group_columns: builtins.list
        Definitions of the group columns.
    """
    remove_rows: Optional[list[int]] = []
    remove_columns: Optional[list[int]] = []
    date_columns: DateColumn
    value_columns: list[ValueColumn]
    group_columns: list[GroupColumn] = []

Model for the input parameter needed for the first CHECK-IN step.

Parameters

remove_rows : typing.Optional
Indexes of the rows to be removed before validation. Note: If the raw data was committed as pandas data frame the header is the first row (row index 0).
remove_columns : typing.Optional
Indexes of the columns to be removed before validation.
date_columns : DateColumn
Definition of the date column.
value_columns : builtins.list
Definitions of the value columns.
group_columns : builtins.list
Definitions of the group columns.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var date_columnsDateColumn
var group_columns : list[GroupColumn]
var model_config
var remove_columns : list[int] | None
var remove_rows : list[int] | None
var value_columns : list[ValueColumn]
class ExpertClient (user: Optional[str] = None,
password: Optional[str] = None,
group: Optional[str] = None,
environment: "Optional[Literal['production', 'staging', 'development']]" = None)
Expand source code
class ExpertClient:
    """FutureEXPERT client."""

    def __init__(self,
                 user: Optional[str] = None,
                 password: Optional[str] = None,
                 group: Optional[str] = None,
                 environment: Optional[Literal['production', 'staging', 'development']] = None) -> None:
        """Initializer.

        Parameters
        ----------
        user: typing.Optional
            The username for the _future_ platform.
            If not provided, the username is read from environment variable FUTURE_USER.
        password: typing.Optional
            The password for the _future_ platform.
            If not provided, the password is read from environment variable FUTURE_PW.
        group: typing.Optional
            Optionally the name of the futureEXPERT group. Only relevant if the user has access to multiple groups.
            If not provided, the group is read from the environment variable FUTURE_GROUP.
        environment: typing.Optional
            Optionally the _future_ environment to be used, defaults to production environment.
            If not provided, the environment is read from the environment variable FUTURE_ENVIRONMENT.
        """
        try:
            future_user = user or os.environ['FUTURE_USER']
        except KeyError:
            raise MissingCredentialsError('username') from None
        try:
            future_password = password or os.environ['FUTURE_PW']
        except KeyError:
            raise MissingCredentialsError('password') from None
        future_group = group or os.getenv('FUTURE_GROUP')
        future_env = cast(Literal['production', 'staging', 'development'],
                          environment or os.getenv('FUTURE_ENVIRONMENT') or 'production')

        self.client = FutureApiClient(user=future_user, password=future_password, environment=future_env)

        authorized_groups = self.client.userinfo['groups']
        if future_group is None and len(authorized_groups) != 1:
            raise ValueError(
                f'You have access to multiple groups. Please select one of the following: {authorized_groups}')
        self.switch_group(new_group=future_group or authorized_groups[0],
                          verbose=future_group is not None)
        self.is_analyst = 'analyst' in self.client.user_roles
        self.forecast_core_id = 'forecast-batch-internal' if self.is_analyst else 'forecast-batch'

    @staticmethod
    def from_dotenv() -> ExpertClient:
        """Create an instance from a .env file or environment variables."""
        dotenv.load_dotenv()
        return ExpertClient()

    def switch_group(self, new_group: str, verbose: bool = True) -> None:
        """Switches the current group.

        Parameters
        ----------
        new_group: builtins.str
            The name of the group to activate.
        verbose: builtins.bool
            If enabled, shows the group name in the log message.
        """
        if new_group not in self.client.userinfo['groups']:
            raise RuntimeError(f'You are not authorized to access group {new_group}')
        self.group = new_group
        verbose_text = f' for group {self.group}' if verbose else ''
        logger.info(f'Successfully logged in{verbose_text}.')

    def upload_data(self, source: Union[pd.DataFrame, str], file_specification: Optional[FileSpecification] = None) -> Any:
        """Uploads the given raw data for further processing.

        Parameters
        ----------
        source: typing.Union
            Path to a CSV file or a pandas data frame.
        file_specification: typing.Optional
            If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones.
            The parameter has no effect if source is a path to a CSV file.

        Returns
        -------
        Identifier for the user Inputs.
        """
        df_file = None
        if isinstance(source, pd.DataFrame):
            if not file_specification:
                file_specification = FileSpecification()
            csv = source.to_csv(index=False, sep=file_specification.delimiter,
                                decimal=file_specification.decimal, encoding='utf-8-sig')
            time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S')
            df_file = (f'expert-{time_stamp}.csv', csv)
            path = None
        else:
            path = source

        # TODO: currently only one file is supported here.
        upload_feedback = self.client.upload_user_inputs_for_group(
            self.group, path, df_file)

        return upload_feedback

    def check_data_definition(self,
                              user_input_id: str,
                              file_uuid: str,
                              data_definition: DataDefinition,
                              file_specification: FileSpecification = FileSpecification()) -> Any:
        """Checks the data definition.

        Removes specified rows and columns. Checks if column values have any issues.

        Parameters
        ----------
        user_input_id: builtins.str
            UUID of the user input.
        file_uuid: builtins.str
            UUID of the file.
        data_definition: futureexpert.checkin.DataDefinition
            Specifies the data, value and group columns and which rows and columns are to be removed first.
        file_specification: futureexpert.checkin.FileSpecification
            Needed if a CSV is used with e.g. German format.
        """
        payload = self._create_checkin_payload_1(
            user_input_id, file_uuid, data_definition, file_specification)

        logger.info('Started data definition using CHECK-IN...')
        result = self.client.execute_action(group_id=self.group,
                                            core_id='checkin-preprocessing',
                                            payload=payload,
                                            interval_status_check_in_seconds=2)

        error_message = result['error']
        if error_message != '':
            raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}')

        logger.info('Finished data definition.')
        return result

    def create_time_series(self,
                           user_input_id: str,
                           file_uuid: str,
                           data_definition: Optional[DataDefinition] = None,
                           config_ts_creation: Optional[TsCreationConfig] = None,
                           config_checkin: Optional[str] = None,
                           file_specification: FileSpecification = FileSpecification()) -> Any:
        """Last step of the CHECK-IN process which creates the time series.

        Aggregates the data and saves them to the database.

        Parameters
        ----------
        user_input_id: builtins.str
            UUID of the user input.
        file_uuid: builtins.str
            UUID of the file.
        data_definition: typing.Optional
            Specifies the data, value and group columns and which rows and columns are to be removed first.
        file_specification: futureexpert.checkin.FileSpecification
            Needed if a CSV is used with e.g. German format.
        config_ts_creation: typing.Optional
            Configuration for the time series creation.
        config_checkin: typing.Optional
            Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
            cannot be set simultaneously. The configuration may be obtained from the last step of
            CHECK-IN using the _future_ frontend (now.future-forecasting.de).
        """
        logger.info('Transforming input data...')

        if config_ts_creation is None and config_checkin is None:
            raise ValueError('No configuration source is provided.')

        if config_ts_creation is not None and config_checkin is not None:
            raise ValueError('Only one configuration source can be processed.')

        if config_checkin is None and (data_definition is None or config_ts_creation is None):
            raise ValueError(
                'For checkin configuration via python `data_defintion`and `config_ts_cration` must be provided.')

        if config_ts_creation is not None and data_definition is not None:
            payload_1 = self._create_checkin_payload_1(
                user_input_id, file_uuid, data_definition, file_specification)
            payload = self._create_checkin_payload_2(payload_1, config_ts_creation)
        if config_checkin is not None:
            payload = self._build_payload_from_ui_config(
                user_input_id=user_input_id, file_uuid=file_uuid, path=config_checkin)

        logger.info('Creating time series using CHECK-IN...')
        result = self.client.execute_action(group_id=self.group,
                                            core_id='checkin-preprocessing',
                                            payload=payload,
                                            interval_status_check_in_seconds=2)
        error_message = result['error']
        if error_message != '':
            raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}')

        logger.info('Finished time series creation.')

        return result

    def check_in_pool_covs(self,
                           requested_pool_covs: list[PoolCovDefinition]) -> CheckInPoolResult:
        """Create a new version from a list of pool covariates and version ids.

        Parameters
        ----------
        requested_pool_covs: builtins.list
            List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id.
            If no version id is provided, the newest version of the covariate is used.

        Returns
        -------
        Result object with fields version_id and pool_cov_information.
        """
        logger.info('Transforming input data...')

        payload = {
            'payload': {
                'requested_indicators': [
                    {**covariate.model_dump(exclude_none=True),
                     'indicator_id': covariate.pool_cov_id}
                    for covariate in requested_pool_covs
                ]
            }
        }
        for covariate in payload['payload']['requested_indicators']:
            covariate.pop('pool_cov_id', None)

        logger.info('Creating time series using checkin-pool...')
        result = self.client.execute_action(group_id=self.group,
                                            core_id='checkin-pool',
                                            payload=payload,
                                            interval_status_check_in_seconds=2)

        logger.info('Finished time series creation.')

        return CheckInPoolResult(**result['result'])

    def get_pool_cov_overview(self,
                              granularity: Optional[str] = None,
                              search: Optional[str] = None) -> PoolCovOverview:
        """Gets an overview of all covariates available on POOL according to the given filters.

        Parameters
        ----------
        granularity: typing.Optional
            If set, returns only data matching that granularity (Day or Month).
        search: typing.Optional
            If set, performs a full-text search and only returns data found in that search.

        Returns
        -------
        PoolCovOverview object with tables containing the covariates with
        different levels of detail .
        """
        response_json = self.client.get_pool_cov_overview(granularity=granularity, search=search)
        return PoolCovOverview(response_json)

    def check_in_time_series(self,
                             raw_data_source: Union[pd.DataFrame, str],
                             data_definition: Optional[DataDefinition] = None,
                             config_ts_creation: Optional[TsCreationConfig] = None,
                             config_checkin: Optional[str] = None,
                             file_specification: FileSpecification = FileSpecification()) -> CheckInResult:
        """Checks in time series data that can be used as actuals or covariate data.

        Parameters
        ----------
        raw_data_source: typing.Union
            Data frame that contains the raw data or path to where the CSV file with the data is stored.
        data_definition: typing.Optional
            Specifies the data, value and group columns and which rows and columns are to be removed.
        config_ts_creation: typing.Optional
            Defines filter and aggreagtion level of the time series.
        config_checkin: typing.Optional
            Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
            cannot be set simultaneously. The configuration may be obtained from the last step of
            CHECK-IN using the future frontend (now.future-forecasting.de).
        file_specification: futureexpert.checkin.FileSpecification
            Needed if a CSV is used with e.g. German format.

        Returns
        -------
        Id of the time series version. Used to identifiy the time series and the values of the time series.
        """
        upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification)

        user_input_id = upload_feedback['uuid']
        file_id = upload_feedback['files'][0]['uuid']

        response = self.create_time_series(user_input_id=user_input_id,
                                           file_uuid=file_id,
                                           data_definition=data_definition,
                                           config_ts_creation=config_ts_creation,
                                           config_checkin=config_checkin,
                                           file_specification=file_specification)

        result = [TimeSeries(**ts) for ts in response['result']['timeSeries']]
        return CheckInResult(time_series=result,
                             version_id=response['result']['tsVersion']['_id'])

    def _create_checkin_payload_1(self, user_input_id: str,
                                  file_uuid: str,
                                  data_definition: DataDefinition,
                                  file_specification: FileSpecification = FileSpecification()) -> Any:
        """Creates the payload for the CHECK-IN stage prepareDataset.

        Parameters
        ----------
        user_input_id: builtins.str
            UUID of the user input.
        file_uuid: builtins.str
            UUID of the file.
        data_definition: futureexpert.checkin.DataDefinition
            Specifies the data, value and group columns and which rows and columns are to be removed first.
        file_specification: futureexpert.checkin.FileSpecification
            Specify the format of the CSV file. Only relevant if a CSV was given as input.
        """

        return {'userInputId': user_input_id,
                'payload': {
                    'stage': 'prepareDataset',
                    'fileUuid': file_uuid,
                    'meta': file_specification.model_dump(),
                    'performedTasks': {
                        'removedRows': data_definition.remove_rows,
                        'removedCols': data_definition.remove_columns
                    },
                    'columnDefinition': {
                        'dateColumns': [{snake_to_camel(key): value for key, value in
                                        data_definition.date_columns.model_dump(exclude_none=True).items()}],
                        'valueColumns': [{snake_to_camel(key): value for key, value in d.model_dump(exclude_none=True).items()}
                                         for d in data_definition.value_columns],
                        'groupColumns': [{snake_to_camel(key): value for key, value in d.model_dump(exclude_none=True).items()}
                                         for d in data_definition.group_columns]
                    }
                }}

    def _build_payload_from_ui_config(self, user_input_id: str, file_uuid: str, path: str) -> Any:
        """Creates the payload for the CHECK-IN stage createDataset.

        Parameters
        ----------
        user_input_id: builtins.str
            UUID of the user input.
        file_uuid: builtins.str
            UUID of the file.
        path: builtins.str
            Path to the JSON file.
        """

        with open(path) as file:
            file_data = file.read()
            json_data = json.loads(file_data)

        json_data['stage'] = 'createDataset'
        json_data['fileUuid'] = file_uuid
        del json_data["performedTasksLog"]

        return {'userInputId': user_input_id,
                'payload': json_data}

    def _create_checkin_payload_2(self, payload: dict[str, Any], config: TsCreationConfig) -> Any:
        """Creates the payload for the CHECK-IN stage createDataset.

        Parameters
        ----------
        payload: builtins.dict
            Payload used in `create_checkin_payload_1`.
        config: futureexpert.checkin.TsCreationConfig
            Configuration for time series creation.
        """

        payload['payload']['rawDataReviewResults'] = {}
        payload['payload']['timeSeriesDatasetParameter'] = {
            'aggregation': {'operator': 'sum',
                            'option': config.missing_value_handler},
            'date': {
                'timeGranularity': config.time_granularity,
                'startDate': config.start_date,
                'endDate': config.end_date
            },
            'grouping': {
                'dataLevel': config.grouping_level,
                'filter':  [d.model_dump() for d in config.filter]
            },
            'values': [{snake_to_camel(key): value for key, value in d.model_dump().items()} for d in config.new_variables],
            'valueColumnsToSave': config.value_columns_to_save
        }
        payload['payload']['stage'] = 'createDataset'

        return payload

    def _create_forecast_payload(self, version: str, config: ReportConfig) -> Any:
        """Creates the payload for the forecast.

        Parameters
        ----------
        version: builtins.str
            Version of the time series that should get forecasts.
        config: futureexpert.forecast.ReportConfig
            Configuration of the forecast run.
        """

        config_dict = config.model_dump()
        config_dict['actuals_version'] = version
        config_dict['report_note'] = config_dict['title']
        config_dict['cov_selection_report_id'] = config_dict['matcher_report_id']
        config_dict['forecasting']['n_ahead'] = config_dict['forecasting']['fc_horizon']
        config_dict['backtesting'] = config_dict['method_selection']

        if config.pool_covs is not None:
            pool_covs_checkin_result = self.check_in_pool_covs(requested_pool_covs=config.pool_covs)
            cast(list[str], config_dict['covs_versions']).append(pool_covs_checkin_result.version_id)
        config_dict.pop('pool_covs')

        config_dict.pop('title')
        config_dict['forecasting'].pop('fc_horizon')
        config_dict.pop('matcher_report_id')
        config_dict.pop('method_selection')

        payload = {'payload': config_dict}

        return payload

    def start_forecast(self, version: str, config: ReportConfig) -> ReportIdentifier:
        """Starts a forecasting report.

        Parameters
        ----------
        version: builtins.str
            ID of a time series version.
        config: futureexpert.forecast.ReportConfig
            Configuration of the forecasting report.

        Returns
        -------
        The identifier of the forecasting report.
        """

        version_data = self.client.get_ts_version(self.group, version)
        config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len,
                                                 granularity=version_data['customer_specific']['granularity'])

        if config.method_selection:
            config.method_selection.forecasting_methods = remove_arima_if_not_allowed(
                granularity=version_data['customer_specific']['granularity'],
                methods=config.method_selection.forecasting_methods)

            if version_data['customer_specific']['granularity'] in ['weekly', 'daily', 'hourly', 'halfhourly'] \
            and 'ARIMA' == config.method_selection.additional_cov_method:
                raise ValueError('ARIMA is not supported for granularities below monthly.')

        logger.info('Preparing data for forecast...')

        if not self.is_analyst and (config.db_name is not None or config.priority is not None):
            raise ValueError('Only users with the role analyst are allowed to use the parameters db_name and priority.')
        payload = self._create_forecast_payload(version, config)
        logger.info('Finished data preparation for forecast.')
        logger.info('Started creating forecasting report with FORECAST...')
        result = self.client.execute_action(group_id=self.group,
                                            core_id=self.forecast_core_id,
                                            payload=payload,
                                            interval_status_check_in_seconds=2)
        logger.info('Finished report creation. Forecasts are running...')
        return ReportIdentifier.model_validate(result)

    def get_report_type(self, report_identifier: int | ReportIdentifier) -> str:
        """Gets the available reports, ordered from newest to oldest.

        Parameters
        ----------
        skip
            The number of initial elements of the report list to skip
        limit
            The limit on the length of the report list

        Returns
        -------
            String representation of the type of one report.
        """
        report_id = report_identifier.report_id if isinstance(
            report_identifier, ReportIdentifier) else report_identifier
        return self.client.get_report_type(group_id=self.group, report_id=report_id)

    def get_reports(self, skip: int = 0, limit: int = 100) -> list[ReportSummary]:
        """Gets the available reports, ordered from newest to oldest.

        Parameters
        ----------
        skip: builtins.int
            The number of initial elements of the report list to skip: builtins.int
        limit: builtins.int
            The limit on the length of the report list

        Returns
        -------
        The available reports from newest to oldest.
        """
        group_reports = self.client.get_group_reports(group_id=self.group, skip=skip, limit=limit)
        return [ReportSummary.model_validate(report) for report in group_reports]

    def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus:
        """Gets the current status of a forecast or matcher report.

        Parameters
        ----------
        id: typing.Union
            Report identifier or plain report ID.
        include_error_reason: builtins.bool
            Determines whether log messages are to be included in the result.

        """
        fc_identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None)
        raw_result = self.client.get_report_status(
            group_id=self.group, report_id=fc_identifier.report_id, include_error_reason=include_error_reason)

        report_status = raw_result['status_summary']
        created = report_status.get('Created', 0)
        successful = report_status.get('Successful', 0)
        noeval = report_status.get('NoEvaluation', 0)
        error = report_status.get('Error', 0)
        summary = ReportStatusProgress(requested=created,
                                       pending=created - successful - noeval - error,
                                       finished=successful + noeval + error)
        results = ReportStatusResults(successful=successful,
                                      no_evaluation=noeval,
                                      error=error)

        return ReportStatus(id=fc_identifier,
                            progress=summary,
                            results=results,
                            error_reasons=raw_result.get('customer_specific', {}).get('log_messages', None))

    def get_fc_results(self,
                       id: Union[ReportIdentifier, int],
                       include_k_best_models: int = 1,
                       include_backtesting: bool = False) -> list[ForecastResult]:
        """Gets the results from the given report.

        Parameters
        ----------
        id: typing.Union
            Forecast identifier or plain report ID.
        include_k_best_models: builtins.int
            Number of k best models for which results are to be returned.
        include_backtesting: builtins.bool
            Determines whether backtesting results are to be returned.
        """

        if include_k_best_models < 1:
            raise ValueError('At least one model is needed.')

        if self.get_report_type(report_identifier=id) != 'MongoForecastingResultSink':
            raise ValueError('The given report ID does not belong to a FORECAST result. ' +
                             'Please input a different ID or use get_matcher_results().')

        report_id = id.report_id if isinstance(id, ReportIdentifier) else id

        results = self.client.get_fc_results(group_id=self.group,
                                             report_id=report_id,
                                             include_k_best_models=include_k_best_models,
                                             include_backtesting=include_backtesting)

        return [ForecastResult(**result) for result in results]

    def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> list[MatcherResult]:
        """Gets the results from the given report.

        Parameters
        ----------
        id: typing.Union
            Report identifier or plain report ID.
        """

        if self.get_report_type(report_identifier=id) != 'CovariateSelection':
            raise ValueError('The given report ID does not belong to a MATCHER result. ' +
                             'Please input a different ID or use get_fc_results().')

        report_id = id.report_id if isinstance(id, ReportIdentifier) else id

        results = self.client.get_matcher_results(group_id=self.group,
                                                  report_id=report_id)

        return [MatcherResult(**result) for result in results]

    def start_forecast_from_raw_data(self,
                                     raw_data_source: Union[pd.DataFrame, str],
                                     config_fc: ReportConfig,
                                     data_definition: Optional[DataDefinition] = None,
                                     config_ts_creation: Optional[TsCreationConfig] = None,
                                     config_checkin: Optional[str] = None,
                                     file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier:
        """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.

        Parameters
        ----------
        raw_data_source: typing.Union
            A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
        config_fc: futureexpert.forecast.ReportConfig
            The configuration of the forecast run.
        data_definition: typing.Optional
            Specifies the data, value and group columns and which rows and columns should be removed.
        config_ts_creation: typing.Optional
            Defines filter and aggreagtion level of the time series.
        config_checkin: typing.Optional
            Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
            cannot be set simultaneously. The configuration may be obtained from the last step of
            CHECK-IN using the future frontend (now.future-forecasting.de).
        file_specification: futureexpert.checkin.FileSpecification
            Needed if a CSV is used with e.g. German format.

        Returns
        -------
        The identifier of the forecasting report.
        """
        upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification)

        user_input_id = upload_feedback['uuid']
        file_id = upload_feedback['files'][0]['uuid']

        res2 = self.create_time_series(user_input_id=user_input_id,
                                       file_uuid=file_id,
                                       data_definition=data_definition,
                                       config_ts_creation=config_ts_creation,
                                       config_checkin=config_checkin,
                                       file_specification=file_specification)

        version = res2['result']['tsVersion']['_id']
        return self.start_forecast(version=version, config=config_fc)

    def start_matcher(self, config: MatcherConfig) -> ReportIdentifier:
        """Starts a covariate matcher report.

        Parameters
        ----------
        version
            ID of a time series version
        config: futureexpert.matcher.MatcherConfig
            Configuration of the covariate matcher report.

        Returns
        -------
        The identifier of the covariate matcher report.
        """

        version_data = self.client.get_ts_version(self.group, config.actuals_version)
        config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len,
                                                 granularity=version_data['customer_specific']['granularity'])

        if not self.is_analyst and config.db_name is not None:
            raise ValueError('Only users with the role analyst are allowed to use the parameter db_name.')

        payload = self._create_matcher_payload(config)

        result = self.client.execute_action(group_id=self.group,
                                            core_id='cov-selection',
                                            payload=payload,
                                            interval_status_check_in_seconds=2)
        logger.info('Finished report creation.')
        return ReportIdentifier.model_validate(result)

    def _create_matcher_payload(self, config: MatcherConfig) -> Any:
        """Converts the MatcherConfig into the payload needed for the cov-selection core."""
        all_covs_versions = config.covs_versions
        if config.pool_covs is not None:
            pool_covs_checkin_result = self.check_in_pool_covs(requested_pool_covs=config.pool_covs)
            all_covs_versions.append(pool_covs_checkin_result.version_id)

        config_dict: dict[str, Any] = {
            'report_description': config.title,
            'db_name': config.db_name,
            'data_config': {
                'actuals_version': config.actuals_version,
                'actuals_filter': config.actuals_filter,
                'covs_versions': all_covs_versions,
                'covs_filter': config.covs_filter,
            },
            "compute_config": {
                "evaluation_start_date": config.evaluation_start_date,
                "evaluation_end_date": config.evaluation_end_date,
                'max_ts_len': config.max_ts_len,
                "base_report_id": None,
                "base_report_requested_run_status": None,
                "report_update_strategy": "KEEP_OWN_RUNS",
                "cov_names": {
                    'cov_name_prefix': '',
                    'cov_name_field': 'name',
                    'cov_name_suffix': '',
                },
                "preselection": {
                    "num_obs_short_term_class": 36,
                    "max_publication_lag": config.max_publication_lag,
                },
                "postselection": {
                    "num_obs_short_term_correlation": 60,
                    "clustering_run_id": None,
                    "post_selection_queries": config.post_selection_queries,
                    "post_selection_concatenation_operator": "&",
                    "protected_selections_queries": [],
                    "protected_selections_concatenation_operator": "&"
                },
                "enable_leading_covariate_selection": config.enable_leading_covariate_selection,
                "fixed_season_length": config.fixed_season_length,
                "lag_selection": {
                    "fixed_lags": config.lag_selection.fixed_lags,
                    "min_lag": config.lag_selection.min_lag,
                    "max_lag": config.lag_selection.max_lag,
                }
            }
        }

        return {'payload': config_dict}

FutureEXPERT client.

Initializer.

Parameters

user : typing.Optional
The username for the future platform. If not provided, the username is read from environment variable FUTURE_USER.
password : typing.Optional
The password for the future platform. If not provided, the password is read from environment variable FUTURE_PW.
group : typing.Optional
Optionally the name of the futureEXPERT group. Only relevant if the user has access to multiple groups. If not provided, the group is read from the environment variable FUTURE_GROUP.
environment : typing.Optional
Optionally the future environment to be used, defaults to production environment. If not provided, the environment is read from the environment variable FUTURE_ENVIRONMENT.

Static methods

def from_dotenv() ‑> ExpertClient
Expand source code
@staticmethod
def from_dotenv() -> ExpertClient:
    """Create an instance from a .env file or environment variables."""
    dotenv.load_dotenv()
    return ExpertClient()

Create an instance from a .env file or environment variables.

Methods

def check_data_definition(self,
user_input_id: str,
file_uuid: str,
data_definition: DataDefinition,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any
Expand source code
def check_data_definition(self,
                          user_input_id: str,
                          file_uuid: str,
                          data_definition: DataDefinition,
                          file_specification: FileSpecification = FileSpecification()) -> Any:
    """Checks the data definition.

    Removes specified rows and columns. Checks if column values have any issues.

    Parameters
    ----------
    user_input_id: builtins.str
        UUID of the user input.
    file_uuid: builtins.str
        UUID of the file.
    data_definition: futureexpert.checkin.DataDefinition
        Specifies the data, value and group columns and which rows and columns are to be removed first.
    file_specification: futureexpert.checkin.FileSpecification
        Needed if a CSV is used with e.g. German format.
    """
    payload = self._create_checkin_payload_1(
        user_input_id, file_uuid, data_definition, file_specification)

    logger.info('Started data definition using CHECK-IN...')
    result = self.client.execute_action(group_id=self.group,
                                        core_id='checkin-preprocessing',
                                        payload=payload,
                                        interval_status_check_in_seconds=2)

    error_message = result['error']
    if error_message != '':
        raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}')

    logger.info('Finished data definition.')
    return result

Checks the data definition.

Removes specified rows and columns. Checks if column values have any issues.

Parameters

user_input_id : builtins.str
UUID of the user input.
file_uuid : builtins.str
UUID of the file.
data_definition : DataDefinition
Specifies the data, value and group columns and which rows and columns are to be removed first.
file_specification : FileSpecification
Needed if a CSV is used with e.g. German format.
def check_in_pool_covs(self, requested_pool_covs: list[PoolCovDefinition]) ‑> CheckInPoolResult
Expand source code
def check_in_pool_covs(self,
                       requested_pool_covs: list[PoolCovDefinition]) -> CheckInPoolResult:
    """Create a new version from a list of pool covariates and version ids.

    Parameters
    ----------
    requested_pool_covs: builtins.list
        List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id.
        If no version id is provided, the newest version of the covariate is used.

    Returns
    -------
    Result object with fields version_id and pool_cov_information.
    """
    logger.info('Transforming input data...')

    payload = {
        'payload': {
            'requested_indicators': [
                {**covariate.model_dump(exclude_none=True),
                 'indicator_id': covariate.pool_cov_id}
                for covariate in requested_pool_covs
            ]
        }
    }
    for covariate in payload['payload']['requested_indicators']:
        covariate.pop('pool_cov_id', None)

    logger.info('Creating time series using checkin-pool...')
    result = self.client.execute_action(group_id=self.group,
                                        core_id='checkin-pool',
                                        payload=payload,
                                        interval_status_check_in_seconds=2)

    logger.info('Finished time series creation.')

    return CheckInPoolResult(**result['result'])

Create a new version from a list of pool covariates and version ids.

Parameters

requested_pool_covs : builtins.list
List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id. If no version id is provided, the newest version of the covariate is used.

Returns

Result object with fields version_id and pool_cov_information.

def check_in_time_series(self,
raw_data_source: Union[pd.DataFrame, str],
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> CheckInResult
Expand source code
def check_in_time_series(self,
                         raw_data_source: Union[pd.DataFrame, str],
                         data_definition: Optional[DataDefinition] = None,
                         config_ts_creation: Optional[TsCreationConfig] = None,
                         config_checkin: Optional[str] = None,
                         file_specification: FileSpecification = FileSpecification()) -> CheckInResult:
    """Checks in time series data that can be used as actuals or covariate data.

    Parameters
    ----------
    raw_data_source: typing.Union
        Data frame that contains the raw data or path to where the CSV file with the data is stored.
    data_definition: typing.Optional
        Specifies the data, value and group columns and which rows and columns are to be removed.
    config_ts_creation: typing.Optional
        Defines filter and aggreagtion level of the time series.
    config_checkin: typing.Optional
        Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
        cannot be set simultaneously. The configuration may be obtained from the last step of
        CHECK-IN using the future frontend (now.future-forecasting.de).
    file_specification: futureexpert.checkin.FileSpecification
        Needed if a CSV is used with e.g. German format.

    Returns
    -------
    Id of the time series version. Used to identifiy the time series and the values of the time series.
    """
    upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification)

    user_input_id = upload_feedback['uuid']
    file_id = upload_feedback['files'][0]['uuid']

    response = self.create_time_series(user_input_id=user_input_id,
                                       file_uuid=file_id,
                                       data_definition=data_definition,
                                       config_ts_creation=config_ts_creation,
                                       config_checkin=config_checkin,
                                       file_specification=file_specification)

    result = [TimeSeries(**ts) for ts in response['result']['timeSeries']]
    return CheckInResult(time_series=result,
                         version_id=response['result']['tsVersion']['_id'])

Checks in time series data that can be used as actuals or covariate data.

Parameters

raw_data_source : typing.Union
Data frame that contains the raw data or path to where the CSV file with the data is stored.
data_definition : typing.Optional
Specifies the data, value and group columns and which rows and columns are to be removed.
config_ts_creation : typing.Optional
Defines filter and aggreagtion level of the time series.
config_checkin : typing.Optional
Path to the JSON file with the CHECK-IN configuration. config_ts_creation and config_checkin cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de).
file_specification : FileSpecification
Needed if a CSV is used with e.g. German format.

Returns

Id of the time series version. Used to identifiy the time series and the values of the time series.

def create_time_series(self,
user_input_id: str,
file_uuid: str,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any
Expand source code
def create_time_series(self,
                       user_input_id: str,
                       file_uuid: str,
                       data_definition: Optional[DataDefinition] = None,
                       config_ts_creation: Optional[TsCreationConfig] = None,
                       config_checkin: Optional[str] = None,
                       file_specification: FileSpecification = FileSpecification()) -> Any:
    """Last step of the CHECK-IN process which creates the time series.

    Aggregates the data and saves them to the database.

    Parameters
    ----------
    user_input_id: builtins.str
        UUID of the user input.
    file_uuid: builtins.str
        UUID of the file.
    data_definition: typing.Optional
        Specifies the data, value and group columns and which rows and columns are to be removed first.
    file_specification: futureexpert.checkin.FileSpecification
        Needed if a CSV is used with e.g. German format.
    config_ts_creation: typing.Optional
        Configuration for the time series creation.
    config_checkin: typing.Optional
        Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
        cannot be set simultaneously. The configuration may be obtained from the last step of
        CHECK-IN using the _future_ frontend (now.future-forecasting.de).
    """
    logger.info('Transforming input data...')

    if config_ts_creation is None and config_checkin is None:
        raise ValueError('No configuration source is provided.')

    if config_ts_creation is not None and config_checkin is not None:
        raise ValueError('Only one configuration source can be processed.')

    if config_checkin is None and (data_definition is None or config_ts_creation is None):
        raise ValueError(
            'For checkin configuration via python `data_defintion`and `config_ts_cration` must be provided.')

    if config_ts_creation is not None and data_definition is not None:
        payload_1 = self._create_checkin_payload_1(
            user_input_id, file_uuid, data_definition, file_specification)
        payload = self._create_checkin_payload_2(payload_1, config_ts_creation)
    if config_checkin is not None:
        payload = self._build_payload_from_ui_config(
            user_input_id=user_input_id, file_uuid=file_uuid, path=config_checkin)

    logger.info('Creating time series using CHECK-IN...')
    result = self.client.execute_action(group_id=self.group,
                                        core_id='checkin-preprocessing',
                                        payload=payload,
                                        interval_status_check_in_seconds=2)
    error_message = result['error']
    if error_message != '':
        raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}')

    logger.info('Finished time series creation.')

    return result

Last step of the CHECK-IN process which creates the time series.

Aggregates the data and saves them to the database.

Parameters

user_input_id : builtins.str
UUID of the user input.
file_uuid : builtins.str
UUID of the file.
data_definition : typing.Optional
Specifies the data, value and group columns and which rows and columns are to be removed first.
file_specification : FileSpecification
Needed if a CSV is used with e.g. German format.
config_ts_creation : typing.Optional
Configuration for the time series creation.
config_checkin : typing.Optional
Path to the JSON file with the CHECK-IN configuration. config_ts_creation and config_checkin cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de).
def get_fc_results(self,
id: Union[ReportIdentifier, int],
include_k_best_models: int = 1,
include_backtesting: bool = False) ‑> list[ForecastResult]
Expand source code
def get_fc_results(self,
                   id: Union[ReportIdentifier, int],
                   include_k_best_models: int = 1,
                   include_backtesting: bool = False) -> list[ForecastResult]:
    """Gets the results from the given report.

    Parameters
    ----------
    id: typing.Union
        Forecast identifier or plain report ID.
    include_k_best_models: builtins.int
        Number of k best models for which results are to be returned.
    include_backtesting: builtins.bool
        Determines whether backtesting results are to be returned.
    """

    if include_k_best_models < 1:
        raise ValueError('At least one model is needed.')

    if self.get_report_type(report_identifier=id) != 'MongoForecastingResultSink':
        raise ValueError('The given report ID does not belong to a FORECAST result. ' +
                         'Please input a different ID or use get_matcher_results().')

    report_id = id.report_id if isinstance(id, ReportIdentifier) else id

    results = self.client.get_fc_results(group_id=self.group,
                                         report_id=report_id,
                                         include_k_best_models=include_k_best_models,
                                         include_backtesting=include_backtesting)

    return [ForecastResult(**result) for result in results]

Gets the results from the given report.

Parameters

id : typing.Union
Forecast identifier or plain report ID.
include_k_best_models : builtins.int
Number of k best models for which results are to be returned.
include_backtesting : builtins.bool
Determines whether backtesting results are to be returned.
def get_matcher_results(self, id: Union[ReportIdentifier, int]) ‑> list[MatcherResult]
Expand source code
def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> list[MatcherResult]:
    """Gets the results from the given report.

    Parameters
    ----------
    id: typing.Union
        Report identifier or plain report ID.
    """

    if self.get_report_type(report_identifier=id) != 'CovariateSelection':
        raise ValueError('The given report ID does not belong to a MATCHER result. ' +
                         'Please input a different ID or use get_fc_results().')

    report_id = id.report_id if isinstance(id, ReportIdentifier) else id

    results = self.client.get_matcher_results(group_id=self.group,
                                              report_id=report_id)

    return [MatcherResult(**result) for result in results]

Gets the results from the given report.

Parameters

id : typing.Union
Report identifier or plain report ID.
def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) ‑> PoolCovOverview
Expand source code
def get_pool_cov_overview(self,
                          granularity: Optional[str] = None,
                          search: Optional[str] = None) -> PoolCovOverview:
    """Gets an overview of all covariates available on POOL according to the given filters.

    Parameters
    ----------
    granularity: typing.Optional
        If set, returns only data matching that granularity (Day or Month).
    search: typing.Optional
        If set, performs a full-text search and only returns data found in that search.

    Returns
    -------
    PoolCovOverview object with tables containing the covariates with
    different levels of detail .
    """
    response_json = self.client.get_pool_cov_overview(granularity=granularity, search=search)
    return PoolCovOverview(response_json)

Gets an overview of all covariates available on POOL according to the given filters.

Parameters

granularity : typing.Optional
If set, returns only data matching that granularity (Day or Month).
search : typing.Optional
If set, performs a full-text search and only returns data found in that search.

Returns

PoolCovOverview object with tables containing the covariates with
 

different levels of detail .

def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) ‑> ReportStatus
Expand source code
def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus:
    """Gets the current status of a forecast or matcher report.

    Parameters
    ----------
    id: typing.Union
        Report identifier or plain report ID.
    include_error_reason: builtins.bool
        Determines whether log messages are to be included in the result.

    """
    fc_identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None)
    raw_result = self.client.get_report_status(
        group_id=self.group, report_id=fc_identifier.report_id, include_error_reason=include_error_reason)

    report_status = raw_result['status_summary']
    created = report_status.get('Created', 0)
    successful = report_status.get('Successful', 0)
    noeval = report_status.get('NoEvaluation', 0)
    error = report_status.get('Error', 0)
    summary = ReportStatusProgress(requested=created,
                                   pending=created - successful - noeval - error,
                                   finished=successful + noeval + error)
    results = ReportStatusResults(successful=successful,
                                  no_evaluation=noeval,
                                  error=error)

    return ReportStatus(id=fc_identifier,
                        progress=summary,
                        results=results,
                        error_reasons=raw_result.get('customer_specific', {}).get('log_messages', None))

Gets the current status of a forecast or matcher report.

Parameters

id : typing.Union
Report identifier or plain report ID.
include_error_reason : builtins.bool
Determines whether log messages are to be included in the result.
def get_report_type(self, report_identifier: int | ReportIdentifier) ‑> str
Expand source code
def get_report_type(self, report_identifier: int | ReportIdentifier) -> str:
    """Gets the available reports, ordered from newest to oldest.

    Parameters
    ----------
    skip
        The number of initial elements of the report list to skip
    limit
        The limit on the length of the report list

    Returns
    -------
        String representation of the type of one report.
    """
    report_id = report_identifier.report_id if isinstance(
        report_identifier, ReportIdentifier) else report_identifier
    return self.client.get_report_type(group_id=self.group, report_id=report_id)

Gets the available reports, ordered from newest to oldest.

Parameters

skip
The number of initial elements of the report list to skip
limit
The limit on the length of the report list

Returns

String representation of the type of one report.
def get_reports(self, skip: int = 0, limit: int = 100) ‑> list[ReportSummary]
Expand source code
def get_reports(self, skip: int = 0, limit: int = 100) -> list[ReportSummary]:
    """Gets the available reports, ordered from newest to oldest.

    Parameters
    ----------
    skip: builtins.int
        The number of initial elements of the report list to skip: builtins.int
    limit: builtins.int
        The limit on the length of the report list

    Returns
    -------
    The available reports from newest to oldest.
    """
    group_reports = self.client.get_group_reports(group_id=self.group, skip=skip, limit=limit)
    return [ReportSummary.model_validate(report) for report in group_reports]

Gets the available reports, ordered from newest to oldest.

Parameters

skip : builtins.int
The number of initial elements of the report list to skip: builtins.int
limit : builtins.int
The limit on the length of the report list

Returns

The available reports from newest to oldest.

def start_forecast(self,
version: str,
config: ReportConfig) ‑> ReportIdentifier
Expand source code
def start_forecast(self, version: str, config: ReportConfig) -> ReportIdentifier:
    """Starts a forecasting report.

    Parameters
    ----------
    version: builtins.str
        ID of a time series version.
    config: futureexpert.forecast.ReportConfig
        Configuration of the forecasting report.

    Returns
    -------
    The identifier of the forecasting report.
    """

    version_data = self.client.get_ts_version(self.group, version)
    config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len,
                                             granularity=version_data['customer_specific']['granularity'])

    if config.method_selection:
        config.method_selection.forecasting_methods = remove_arima_if_not_allowed(
            granularity=version_data['customer_specific']['granularity'],
            methods=config.method_selection.forecasting_methods)

        if version_data['customer_specific']['granularity'] in ['weekly', 'daily', 'hourly', 'halfhourly'] \
        and 'ARIMA' == config.method_selection.additional_cov_method:
            raise ValueError('ARIMA is not supported for granularities below monthly.')

    logger.info('Preparing data for forecast...')

    if not self.is_analyst and (config.db_name is not None or config.priority is not None):
        raise ValueError('Only users with the role analyst are allowed to use the parameters db_name and priority.')
    payload = self._create_forecast_payload(version, config)
    logger.info('Finished data preparation for forecast.')
    logger.info('Started creating forecasting report with FORECAST...')
    result = self.client.execute_action(group_id=self.group,
                                        core_id=self.forecast_core_id,
                                        payload=payload,
                                        interval_status_check_in_seconds=2)
    logger.info('Finished report creation. Forecasts are running...')
    return ReportIdentifier.model_validate(result)

Starts a forecasting report.

Parameters

version : builtins.str
ID of a time series version.
config : ReportConfig
Configuration of the forecasting report.

Returns

The identifier of the forecasting report.

def start_forecast_from_raw_data(self,
raw_data_source: Union[pd.DataFrame, str],
config_fc: ReportConfig,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> ReportIdentifier
Expand source code
def start_forecast_from_raw_data(self,
                                 raw_data_source: Union[pd.DataFrame, str],
                                 config_fc: ReportConfig,
                                 data_definition: Optional[DataDefinition] = None,
                                 config_ts_creation: Optional[TsCreationConfig] = None,
                                 config_checkin: Optional[str] = None,
                                 file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier:
    """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.

    Parameters
    ----------
    raw_data_source: typing.Union
        A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
    config_fc: futureexpert.forecast.ReportConfig
        The configuration of the forecast run.
    data_definition: typing.Optional
        Specifies the data, value and group columns and which rows and columns should be removed.
    config_ts_creation: typing.Optional
        Defines filter and aggreagtion level of the time series.
    config_checkin: typing.Optional
        Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
        cannot be set simultaneously. The configuration may be obtained from the last step of
        CHECK-IN using the future frontend (now.future-forecasting.de).
    file_specification: futureexpert.checkin.FileSpecification
        Needed if a CSV is used with e.g. German format.

    Returns
    -------
    The identifier of the forecasting report.
    """
    upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification)

    user_input_id = upload_feedback['uuid']
    file_id = upload_feedback['files'][0]['uuid']

    res2 = self.create_time_series(user_input_id=user_input_id,
                                   file_uuid=file_id,
                                   data_definition=data_definition,
                                   config_ts_creation=config_ts_creation,
                                   config_checkin=config_checkin,
                                   file_specification=file_specification)

    version = res2['result']['tsVersion']['_id']
    return self.start_forecast(version=version, config=config_fc)

Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.

Parameters

raw_data_source : typing.Union
A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
config_fc : ReportConfig
The configuration of the forecast run.
data_definition : typing.Optional
Specifies the data, value and group columns and which rows and columns should be removed.
config_ts_creation : typing.Optional
Defines filter and aggreagtion level of the time series.
config_checkin : typing.Optional
Path to the JSON file with the CHECK-IN configuration. config_ts_creation and config_checkin cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de).
file_specification : FileSpecification
Needed if a CSV is used with e.g. German format.

Returns

The identifier of the forecasting report.

def start_matcher(self,
config: MatcherConfig) ‑> ReportIdentifier
Expand source code
def start_matcher(self, config: MatcherConfig) -> ReportIdentifier:
    """Starts a covariate matcher report.

    Parameters
    ----------
    version
        ID of a time series version
    config: futureexpert.matcher.MatcherConfig
        Configuration of the covariate matcher report.

    Returns
    -------
    The identifier of the covariate matcher report.
    """

    version_data = self.client.get_ts_version(self.group, config.actuals_version)
    config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len,
                                             granularity=version_data['customer_specific']['granularity'])

    if not self.is_analyst and config.db_name is not None:
        raise ValueError('Only users with the role analyst are allowed to use the parameter db_name.')

    payload = self._create_matcher_payload(config)

    result = self.client.execute_action(group_id=self.group,
                                        core_id='cov-selection',
                                        payload=payload,
                                        interval_status_check_in_seconds=2)
    logger.info('Finished report creation.')
    return ReportIdentifier.model_validate(result)

Starts a covariate matcher report.

Parameters

version
ID of a time series version
config : MatcherConfig
Configuration of the covariate matcher report.

Returns

The identifier of the covariate matcher report.

def switch_group(self, new_group: str, verbose: bool = True) ‑> None
Expand source code
def switch_group(self, new_group: str, verbose: bool = True) -> None:
    """Switches the current group.

    Parameters
    ----------
    new_group: builtins.str
        The name of the group to activate.
    verbose: builtins.bool
        If enabled, shows the group name in the log message.
    """
    if new_group not in self.client.userinfo['groups']:
        raise RuntimeError(f'You are not authorized to access group {new_group}')
    self.group = new_group
    verbose_text = f' for group {self.group}' if verbose else ''
    logger.info(f'Successfully logged in{verbose_text}.')

Switches the current group.

Parameters

new_group : builtins.str
The name of the group to activate.
verbose : builtins.bool
If enabled, shows the group name in the log message.
def upload_data(self,
source: Union[pd.DataFrame, str],
file_specification: Optional[FileSpecification] = None) ‑> Any
Expand source code
def upload_data(self, source: Union[pd.DataFrame, str], file_specification: Optional[FileSpecification] = None) -> Any:
    """Uploads the given raw data for further processing.

    Parameters
    ----------
    source: typing.Union
        Path to a CSV file or a pandas data frame.
    file_specification: typing.Optional
        If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones.
        The parameter has no effect if source is a path to a CSV file.

    Returns
    -------
    Identifier for the user Inputs.
    """
    df_file = None
    if isinstance(source, pd.DataFrame):
        if not file_specification:
            file_specification = FileSpecification()
        csv = source.to_csv(index=False, sep=file_specification.delimiter,
                            decimal=file_specification.decimal, encoding='utf-8-sig')
        time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S')
        df_file = (f'expert-{time_stamp}.csv', csv)
        path = None
    else:
        path = source

    # TODO: currently only one file is supported here.
    upload_feedback = self.client.upload_user_inputs_for_group(
        self.group, path, df_file)

    return upload_feedback

Uploads the given raw data for further processing.

Parameters

source : typing.Union
Path to a CSV file or a pandas data frame.
file_specification : typing.Optional
If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones. The parameter has no effect if source is a path to a CSV file.

Returns

Identifier for the user Inputs.

class FileSpecification (**data: Any)
Expand source code
class FileSpecification(BaseConfig):
    """Specify the format of the CSV file.

    Parameters
    ----------
    delimiter: typing.Optional
        The delimiter used to separate values.
    decimal: typing.Optional
        The decimal character used in decimal numbers.
    thousands: typing.Optional
        The thousands separator used in numbers.
    """
    delimiter: Optional[str] = ','
    decimal: Optional[str] = '.'
    thousands: Optional[str] = None

Specify the format of the CSV file.

Parameters

delimiter : typing.Optional
The delimiter used to separate values.
decimal : typing.Optional
The decimal character used in decimal numbers.
thousands : typing.Optional
The thousands separator used in numbers.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var decimal : str | None
var delimiter : str | None
var model_config
var thousands : str | None
class FilterSettings (**data: Any)
Expand source code
class FilterSettings(BaseConfig):
    """Model for the filters.

    Parameters
    ----------
    type: typing.Literal
        The type of filter: `exclusion` or `inclusion`.
    variable: builtins.str
        The columns name to be used for filtering.
    items: builtins.list
        The list of values to be used for filtering.
    """
    type: Literal['exclusion', 'inclusion']
    variable: str
    items: list[str]

Model for the filters.

Parameters

type : typing.Literal
The type of filter: exclusion or inclusion.
variable : builtins.str
The columns name to be used for filtering.
items : builtins.list
The list of values to be used for filtering.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var items : list[str]
var model_config
var type : Literal['exclusion', 'inclusion']
var variable : str
class ForecastingConfig (**data: Any)
Expand source code
class ForecastingConfig(BaseConfig):
    """Forecasting configuration.

    Parameters
    ----------
    fc_horizon: futureexpert.shared_models.PositiveInt
        Forecast horizon.
    lower_bound: typing.Optional
        Lower bound applied to the time series and forecasts.
    upper_bound: typing.Optional
        Upper bound applied to the time series and forecasts.
    confidence_level: builtins.float
        Confidence level for prediction intervals.
    round_forecast_to_integer: builtins.bool
        If true, then forecasts are rounded to the nearest integer (also applied during backtesting).
    use_ensemble: builtins.bool
        If true, then calculate ensemble forecasts. Automatically makes a smart decision on which
        methods to use based on their backtesting performance.
    """

    fc_horizon: Annotated[ValidatedPositiveInt, pydantic.Field(ge=1, le=60)]
    lower_bound: Union[float, None] = None
    upper_bound: Union[float, None] = None
    confidence_level: float = 0.75
    round_forecast_to_integer: bool = False
    use_ensemble: bool = False

    @property
    def numeric_bounds(self) -> tuple[float, float]:
        return (
            self.lower_bound if self.lower_bound is not None else -np.inf,
            self.upper_bound if self.upper_bound is not None else np.inf,
        )

Forecasting configuration.

Parameters

fc_horizon : PositiveInt
Forecast horizon.
lower_bound : typing.Optional
Lower bound applied to the time series and forecasts.
upper_bound : typing.Optional
Upper bound applied to the time series and forecasts.
confidence_level : builtins.float
Confidence level for prediction intervals.
round_forecast_to_integer : builtins.bool
If true, then forecasts are rounded to the nearest integer (also applied during backtesting).
use_ensemble : builtins.bool
If true, then calculate ensemble forecasts. Automatically makes a smart decision on which methods to use based on their backtesting performance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var confidence_level : float
var fc_horizonPositiveInt
var lower_bound : float | None
var model_config
var round_forecast_to_integer : bool
var upper_bound : float | None
var use_ensemble : bool

Instance variables

prop numeric_bounds : tuple[float, float]
Expand source code
@property
def numeric_bounds(self) -> tuple[float, float]:
    return (
        self.lower_bound if self.lower_bound is not None else -np.inf,
        self.upper_bound if self.upper_bound is not None else np.inf,
    )
class LagSelectionConfig (**data: Any)
Expand source code
class LagSelectionConfig(BaseModel):
    """Configures covariate lag selection.

    Parameters
    ----------
    fixed_lags: typing.Optional
        Lags that are tested in the lag selection.
    min_lag: typing.Optional
        Minimal lag that is tested in the lag selection. For example, a lag 3 means the covariate
        is shifted 3 data points into the future.
    max_lag: typing.Optional
        Maximal lag that is tested in the lag selection. For example, a lag 12 means the covariate
        is shifted 12 data points into the future.
    """
    min_lag: Optional[int] = None
    max_lag: Optional[int] = None
    fixed_lags: Optional[list[int]] = None

    @model_validator(mode='after')
    def _check_range(self) -> Self:
        if (self.min_lag is None) ^ (self.max_lag is None):
            raise ValueError(
                'If one of `min_lag` and `max_lag` is set the other one also needs to be set.')

        if self.min_lag and self.max_lag:
            if self.fixed_lags is not None:
                raise ValueError('Fixed lags and min/max lag are mutually exclusive.')
            if self.max_lag < self.min_lag:
                raise ValueError('max_lag needs to be greater or equal to min_lag.')
            lag_range = abs(self.max_lag - self.min_lag) + 1
            if lag_range > 15:
                raise ValueError(f'Only 15 lags are allowed to be tested. The requested range has length {lag_range}.')

        if self.fixed_lags and len(self.fixed_lags) > 15:
            raise ValueError(
                f'Only 15 lags are allowed to be tested. The provided fixed lags has length {len(self.fixed_lags)}.')

        return self

Configures covariate lag selection.

Parameters

fixed_lags : typing.Optional
Lags that are tested in the lag selection.
min_lag : typing.Optional
Minimal lag that is tested in the lag selection. For example, a lag 3 means the covariate is shifted 3 data points into the future.
max_lag : typing.Optional
Maximal lag that is tested in the lag selection. For example, a lag 12 means the covariate is shifted 12 data points into the future.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var fixed_lags : list[int] | None
var max_lag : int | None
var min_lag : int | None
var model_config
class MatcherConfig (**data: Any)
Expand source code
class MatcherConfig(BaseConfig):
    """Configuration for a MATCHER run.

    Parameters
    ----------
    title: builtins.str
        A short description of the report.
    actuals_version: builtins.str
        The version ID of the actuals.
    covs_versions: builtins.list
        List of versions of the covariates.
    actuals_filter: builtins.dict
        Filter criterion for actuals time series. The given actuals version is
        automatically added as additional filter criterion. Possible Filter criteria are all fields that are part
        of the TimeSeries class. e.g. {'name': 'Sales'}
        For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
    covs_filter: builtins.dict
        Filter criterion for covariates time series. The given covariate version is
        automatically added as additional filter criterion. Possible Filter criteria are all fields that are part
        of the TimeSeries class. e.g. {'name': 'Sales'}
        For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
    max_ts_len: typing.Optional
        At most this number of most recent observations of the actuals time series is used. Check the variable MAX_TS_LEN_CONFIG
        for allowed configuration.
    lag_selection: futureexpert.matcher.LagSelectionConfig
        Configuration of covariate lag selection.
    evaluation_start_date: typing.Optional
        Optional start date for the evaluation. The input should be in the ISO format
        with date and time, "YYYY-mm-DDTHH-MM-SS", e.g., "2024-01-01T16:40:00".
        Actuals and covariate observations prior to this start date are dropped.
    evaluation_end_date: typing.Optional
        Optional end date for the evaluation. The input should be in the ISO format
        with date and time, "YYYY-mm-DDTHH-MM-SS", e.g., "2024-01-01T16:40:00".
        Actuals and covariate observations after this end date are dropped.
    max_publication_lag: builtins.int
        Maximal publication lag for the covariates. The publication lag of a covariate
        is the number of most recent observations (compared to the actuals) that are
        missing for the covariate. E.g., if the actuals (for monthly granularity) end
        in April 2023 but the covariate ends in February 2023, the covariate has a
        publication lag of 2.
    post_selection_queries: builtins.list
        List of queries that are executed on the ranking summary DataFrame. Only ranking entries that
        match the queries are kept. The query strings need to satisfy the pandas query syntax
        (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html). Here are the columns
        of the ranking summary DataFrame that you might want to filter on:

        Column Name          |      Data Type   |    Description
        -----------------------------------------------------------------------------------------------
        Lag                  |          Int64   |    Lag of the covariate.
        Rank                 |        float64   |    Rank of the model.
        BetterThanNoCov      |           bool   |    Indicates whether the model is better than the non-cov model.
    enable_leading_covariate_selection: builtins.bool
        When True, all covariates after the lag is applied that do not have at least one more
        datapoint beyond the the time period covered by actuals are removed from the candidate
        covariates passed to covariate selection.
    fixed_season_length: typing.Optional
        An optional parameter specifying the length of a season in the dataset.
    pool_covs: typing.Optional
        List of covariate definitions.
    db_name: typing.Optional
        Only accessible for internal use. Name of the database to use for storing the results.
    """
    title: str
    actuals_version: str
    covs_versions: list[str] = Field(default_factory=list)
    actuals_filter: dict[str, Any] = Field(default_factory=dict)
    covs_filter: dict[str, Any] = Field(default_factory=dict)
    max_ts_len: Annotated[
        Optional[int], pydantic.Field(ge=1, le=1500)] = None
    lag_selection: LagSelectionConfig = LagSelectionConfig()
    evaluation_start_date: Optional[str] = None
    evaluation_end_date: Optional[str] = None
    max_publication_lag: int = 2
    post_selection_queries: list[str] = []
    enable_leading_covariate_selection: bool = True
    fixed_season_length: Optional[int] = None
    pool_covs: Optional[list[PoolCovDefinition]] = None
    db_name: Optional[str] = None

    @model_validator(mode='after')
    def _validate_post_selection_queries(self) -> Self:
        # Validate the post-selection queries.
        invalid_queries = []
        columns = {
            'Lag': 'int',
            'Rank': 'float',
            'BetterThanNoCov': 'bool'
        }
        # Create an empty DataFrame with the specified column names and data types
        validation_df = pd.DataFrame(columns=columns.keys()).astype(columns)
        for postselection_query in self.post_selection_queries:
            try:
                validation_df.query(postselection_query, )
            except Exception:
                invalid_queries.append(postselection_query)

        if len(invalid_queries):
            raise ValueError("The following post-selection queries are invalidly formatted: "
                             f"{', '.join(invalid_queries)}. ")

        return self

Configuration for a MATCHER run.

Parameters

title : builtins.str
A short description of the report.
actuals_version : builtins.str
The version ID of the actuals.
covs_versions : builtins.list
List of versions of the covariates.
actuals_filter : builtins.dict
Filter criterion for actuals time series. The given actuals version is automatically added as additional filter criterion. Possible Filter criteria are all fields that are part of the TimeSeries class. e.g. {'name': 'Sales'} For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
covs_filter : builtins.dict
Filter criterion for covariates time series. The given covariate version is automatically added as additional filter criterion. Possible Filter criteria are all fields that are part of the TimeSeries class. e.g. {'name': 'Sales'} For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
max_ts_len : typing.Optional
At most this number of most recent observations of the actuals time series is used. Check the variable MAX_TS_LEN_CONFIG for allowed configuration.
lag_selection : LagSelectionConfig
Configuration of covariate lag selection.
evaluation_start_date : typing.Optional
Optional start date for the evaluation. The input should be in the ISO format with date and time, "YYYY-mm-DDTHH-MM-SS", e.g., "2024-01-01T16:40:00". Actuals and covariate observations prior to this start date are dropped.
evaluation_end_date : typing.Optional
Optional end date for the evaluation. The input should be in the ISO format with date and time, "YYYY-mm-DDTHH-MM-SS", e.g., "2024-01-01T16:40:00". Actuals and covariate observations after this end date are dropped.
max_publication_lag : builtins.int
Maximal publication lag for the covariates. The publication lag of a covariate is the number of most recent observations (compared to the actuals) that are missing for the covariate. E.g., if the actuals (for monthly granularity) end in April 2023 but the covariate ends in February 2023, the covariate has a publication lag of 2.
post_selection_queries : builtins.list

List of queries that are executed on the ranking summary DataFrame. Only ranking entries that match the queries are kept. The query strings need to satisfy the pandas query syntax (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html). Here are the columns of the ranking summary DataFrame that you might want to filter on:

Column Name | Data Type | Description

Lag | Int64 | Lag of the covariate. Rank | float64 | Rank of the model. BetterThanNoCov | bool | Indicates whether the model is better than the non-cov model.

enable_leading_covariate_selection : builtins.bool
When True, all covariates after the lag is applied that do not have at least one more datapoint beyond the the time period covered by actuals are removed from the candidate covariates passed to covariate selection.
fixed_season_length : typing.Optional
An optional parameter specifying the length of a season in the dataset.
pool_covs : typing.Optional
List of covariate definitions.
db_name : typing.Optional
Only accessible for internal use. Name of the database to use for storing the results.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var actuals_filter : dict[str, typing.Any]
var actuals_version : str
var covs_filter : dict[str, typing.Any]
var covs_versions : list[str]
var db_name : str | None
var enable_leading_covariate_selection : bool
var evaluation_end_date : str | None
var evaluation_start_date : str | None
var fixed_season_length : int | None
var lag_selectionLagSelectionConfig
var max_publication_lag : int
var max_ts_len : int | None
var model_config
var pool_covs : list[PoolCovDefinition] | None
var post_selection_queries : list[str]
var title : str
class MethodSelectionConfig (**data: Any)
Expand source code
class MethodSelectionConfig(BaseConfig):
    """Method selection configuration.

    Parameters
    ----------
    number_iterations: futureexpert.shared_models.PositiveInt
        Number of backtesting iterations. At least 8 iterations are needed for empirical prediction intervals.
    shift_len: futureexpert.shared_models.PositiveInt
        Number of time points by which the test window is shifted between backtesting iterations.
    refit: builtins.bool
        If true, then models are refitted for each backtesting iteration.
    default_error_metric: typing.Literal
        Error metric applied to the backtesting error for non-sporadic time series.
    sporadic_error_metric: typing.Literal
        Error metric applied to the backtesting errors for sporadic time series.
    additional_accuracy_measures: builtins.list
        Additional accuracy measures computed during model ranking.
    step_weights: typing.Optional
        Mapping from forecast steps to weights associated with forecast errors for the given forecasting step.
        Only positive weights are allowed. Leave a forecast step out to assign a zero weight.
        Used only for non-sporadic time series. If empty, all forecast steps are weighted equally.
    additional_cov_method: typing.Optional
        Define up to one additional method that uses the defined covariates for creating forecasts. Will not be
        calculated if deemed unfit by the preselection. If the parameter forecasting_methods: typing.Sequence
        is defined, the additional cov method must appear in that list, too.
    cov_combination: typing.Literal
        Create a forecast model for each individual covariate (single)
        or a model using all covariates together (joint).
    forecasting_methods: typing.Sequence
        Define specific forecasting methods to be tested for generating forecasts.
        Specifying fewer methods can significantly reduce the runtime of forecast creation.
        If not specified, all available forecasting methods will be used by default.
        Given methods are automatically preselected based on time series characteristics of your data.
        If none of the given methods fits your data, a fallback set of forecasting methods will be used instead.
    """

    number_iterations: Annotated[ValidatedPositiveInt, pydantic.Field(ge=1, le=24)] = PositiveInt(12)
    shift_len: ValidatedPositiveInt = PositiveInt(1)
    refit: bool = False
    default_error_metric: Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape'] = 'mse'
    sporadic_error_metric: Literal['pis', 'sapis', 'acr', 'mar', 'msr'] = 'pis'
    additional_accuracy_measures: list[Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape', 'pis', 'sapis',
                                               'acr', 'mar', 'msr']] = pydantic.Field(default_factory=list)
    step_weights: Optional[dict[ValidatedPositiveInt, pydantic.PositiveFloat]] = None

    additional_cov_method: Optional[AdditionalCovMethod] = None
    cov_combination: Literal['single', 'joint'] = 'single'
    forecasting_methods: Sequence[ForecastingMethods] = pydantic.Field(default_factory=list)

Method selection configuration.

Parameters

number_iterations : PositiveInt
Number of backtesting iterations. At least 8 iterations are needed for empirical prediction intervals.
shift_len : PositiveInt
Number of time points by which the test window is shifted between backtesting iterations.
refit : builtins.bool
If true, then models are refitted for each backtesting iteration.
default_error_metric : typing.Literal
Error metric applied to the backtesting error for non-sporadic time series.
sporadic_error_metric : typing.Literal
Error metric applied to the backtesting errors for sporadic time series.
additional_accuracy_measures : builtins.list
Additional accuracy measures computed during model ranking.
step_weights : typing.Optional
Mapping from forecast steps to weights associated with forecast errors for the given forecasting step. Only positive weights are allowed. Leave a forecast step out to assign a zero weight. Used only for non-sporadic time series. If empty, all forecast steps are weighted equally.
additional_cov_method : typing.Optional
Define up to one additional method that uses the defined covariates for creating forecasts. Will not be calculated if deemed unfit by the preselection. If the parameter forecasting_methods: typing.Sequence is defined, the additional cov method must appear in that list, too.
cov_combination : typing.Literal
Create a forecast model for each individual covariate (single) or a model using all covariates together (joint).
forecasting_methods : typing.Sequence
Define specific forecasting methods to be tested for generating forecasts. Specifying fewer methods can significantly reduce the runtime of forecast creation. If not specified, all available forecasting methods will be used by default. Given methods are automatically preselected based on time series characteristics of your data. If none of the given methods fits your data, a fallback set of forecasting methods will be used instead.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var additional_accuracy_measures : list[typing.Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape', 'pis', 'sapis', 'acr', 'mar', 'msr']]
var additional_cov_method : Literal['AdaBoost', 'ARIMA', 'CART', 'CatBoost', 'ExtraTrees', 'Glmnet(l1_ratio=1.0)', 'LightGBM', 'LinearRegression', 'MLP', 'RandomForest', 'SVM', 'XGBoost'] | None
var cov_combination : Literal['single', 'joint']
var default_error_metric : Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape']
var forecasting_methods : Sequence[Literal['AdaBoost', 'Aft4Sporadic', 'ARIMA', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']]
var model_config
var number_iterationsPositiveInt
var refit : bool
var shift_lenPositiveInt
var sporadic_error_metric : Literal['pis', 'sapis', 'acr', 'mar', 'msr']
var step_weights : dict[PositiveInt, float] | None
class PreprocessingConfig (**data: Any)
Expand source code
class PreprocessingConfig(BaseConfig):
    """Preprocessing configuration.

    Parameters
    ----------
    remove_leading_zeros: builtins.bool
        If true, then leading zeros are removed from the time series before forecasting. Is only applied
        if the time series has at least 5 values, including missing values.
    use_season_detection: builtins.bool
        If true, then the season length is determined from the data.
    seasonalities_to_test: typing.Optional
        Season lengths to be tested. If not defined, a suitable set for the given granularity is used.
        Season lengths can only be tested, if the number of observations is at least three times as
        long as the biggest season length. Note that 1 must be in the list if the non-seasonal case should
        be considered, too. Allows a combination of single granularities or combinations of granularities.
    fixed_seasonalities: typing.Optional
        Season lengths used without checking. Allowed only if `use_season_detection` is false.
    detect_outliers: builtins.bool
        If true, then identifies outliers in the data.
    replace_outliers: builtins.bool
        If true, then identified outliers are replaced.
    detect_changepoints: builtins.bool
        If true, then change points such as level shifts are identified.
    detect_quantization: builtins.bool
        If true, a quantization algorithm is applied to the time series. Recognizes quantizations in the historic
        time series data and, if one has been detected, applies it to the forecasts.
    """

    remove_leading_zeros: bool = False
    use_season_detection: bool = True
    # empty lists and None are treated the same in apollon
    seasonalities_to_test: Optional[list[Union[list[ValidatedPositiveInt], ValidatedPositiveInt]]] = None
    fixed_seasonalities: Optional[list[ValidatedPositiveInt]] = None
    detect_outliers: bool = False
    replace_outliers: bool = False
    detect_changepoints: bool = False
    detect_quantization: bool = False

    @pydantic.model_validator(mode='after')
    def _has_no_fixed_seasonalities_if_uses_season_detection(self) -> Self:
        if self.use_season_detection and self.fixed_seasonalities:
            raise ValueError('If fixed seasonalities is enabled, then season detection must be off.')

        return self

Preprocessing configuration.

Parameters

remove_leading_zeros : builtins.bool
If true, then leading zeros are removed from the time series before forecasting. Is only applied if the time series has at least 5 values, including missing values.
use_season_detection : builtins.bool
If true, then the season length is determined from the data.
seasonalities_to_test : typing.Optional
Season lengths to be tested. If not defined, a suitable set for the given granularity is used. Season lengths can only be tested, if the number of observations is at least three times as long as the biggest season length. Note that 1 must be in the list if the non-seasonal case should be considered, too. Allows a combination of single granularities or combinations of granularities.
fixed_seasonalities : typing.Optional
Season lengths used without checking. Allowed only if use_season_detection is false.
detect_outliers : builtins.bool
If true, then identifies outliers in the data.
replace_outliers : builtins.bool
If true, then identified outliers are replaced.
detect_changepoints : builtins.bool
If true, then change points such as level shifts are identified.
detect_quantization : builtins.bool
If true, a quantization algorithm is applied to the time series. Recognizes quantizations in the historic time series data and, if one has been detected, applies it to the forecasts.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var detect_changepoints : bool
var detect_outliers : bool
var detect_quantization : bool
var fixed_seasonalities : list[PositiveInt] | None
var model_config
var remove_leading_zeros : bool
var replace_outliers : bool
var seasonalities_to_test : list[list[PositiveInt] | PositiveInt] | None
var use_season_detection : bool
class ReportConfig (**data: Any)
Expand source code
class ReportConfig(BaseConfig):
    """Forecast run configuration.

    Parameters
    ----------
    matcher_report_id: typing.Optional
        Report ID of the covariate matcher.
    covs_versions: builtins.list
        List of versions of the covariates.
    covs_configuration: typing.Optional
        Mapping from actuals and covariates. Use for custom covariate or adjusted matcher results.
        If the matcher results should be used without changes use `matcher_report_id` instead.
    title: builtins.str
        Title of the report.
    actuals_filter: builtins.dict
        Filter criterion for actuals time series. The given actuals version is
        automatically added as additional filter criterion. Possible Filter criteria are all fields that are part
        of the TimeSeries class. e.g. {'name': 'Sales'}
        For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
    max_ts_len: typing.Optional
        At most this number of most recent observations is used. Check the variable MAX_TS_LEN_CONFIG
        for allowed configuration.
    preprocessing: futureexpert.forecast.PreprocessingConfig
        Preprocessing configuration.
    forecasting: futureexpert.forecast.ForecastingConfig
        Forecasting configuration.
    method_selection: typing.Optional
        Method selection configuration. If not supplied, then a granularity dependent default is used.
    pool_covs: typing.Optional
        List of covariate definitions.
    db_name: typing.Optional
        Only accessible for internal use. Name of the database to use for storing the results.
    priority: typing.Optional
        Only accessible for internal use. Higher value indicate higher priority.
    """

    matcher_report_id: Optional[int] = None
    covs_versions: list[str] = Field(default_factory=list)
    covs_configuration: Optional[list[ActualsCovsConfiguration]] = None
    title: str
    actuals_filter: dict[str, Any] = Field(default_factory=dict)

    max_ts_len: Optional[int] = None

    preprocessing: PreprocessingConfig = PreprocessingConfig()
    forecasting: ForecastingConfig
    pool_covs: Optional[list[PoolCovDefinition]] = None
    method_selection: Optional[MethodSelectionConfig] = None
    db_name:  Optional[str] = None
    priority: Annotated[Optional[int], pydantic.Field(ge=0, le=10)] = None

    @pydantic.model_validator(mode="after")
    def _correctness_of_cov_configurations(self) -> Self:
        if (self.matcher_report_id or self.covs_configuration) and (
                self.covs_versions is None and self.pool_covs is None):
            raise ValueError(
                'If one of `matcher_report_id` and `covs_configuration` is set also `covs_versions` needs to be set.')
        if (self.matcher_report_id is None and self.covs_configuration is None) and (
                self.covs_versions or self.pool_covs):
            raise ValueError(
                'If `covs_versions` or `pool_covs` is set ' +
                'either `matcher_report_id` or `covs_configuration` needs to be set.')
        if self.covs_configuration is not None and len(self.covs_configuration) == 0:
            raise ValueError('`covs_configuration` has length zero and therefore won`t have any effect. '
                             'Please remove the parameter or set to None.')
        return self

    @pydantic.model_validator(mode="after")
    def _only_one_covariate_definition(self) -> Self:
        fields = [
            'matcher_report_id',
            'pool_covs'
        ]

        set_fields = [field for field in fields if getattr(self, field) is not None]

        if len(set_fields) > 1:
            raise ValueError(f"Only one of {', '.join(fields)} can be set. Found: {', '.join(set_fields)}")

        return self

    @pydantic.model_validator(mode="after")
    def _backtesting_step_weights_refer_to_valid_forecast_steps(self) -> Self:
        if (self.method_selection
            and self.method_selection.step_weights
                and max(self.method_selection.step_weights.keys()) > self.forecasting.fc_horizon):
            raise ValueError('Step weights must not refer to forecast steps beyond the fc_horizon.')

        return self

    @pydantic.model_validator(mode="after")
    def _valid_covs_version(self) -> Self:
        for covs_version in self.covs_versions:
            if re.match('^[0-9a-f]{24}$', covs_version) is None:
                raise ValueError(f'Given covs_version "{covs_version}" is not a valid ObjectId.')
        return self

Forecast run configuration.

Parameters

matcher_report_id : typing.Optional
Report ID of the covariate matcher.
covs_versions : builtins.list
List of versions of the covariates.
covs_configuration : typing.Optional
Mapping from actuals and covariates. Use for custom covariate or adjusted matcher results. If the matcher results should be used without changes use matcher_report_id instead.
title : builtins.str
Title of the report.
actuals_filter : builtins.dict
Filter criterion for actuals time series. The given actuals version is automatically added as additional filter criterion. Possible Filter criteria are all fields that are part of the TimeSeries class. e.g. {'name': 'Sales'} For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
max_ts_len : typing.Optional
At most this number of most recent observations is used. Check the variable MAX_TS_LEN_CONFIG for allowed configuration.
preprocessing : PreprocessingConfig
Preprocessing configuration.
forecasting : ForecastingConfig
Forecasting configuration.
method_selection : typing.Optional
Method selection configuration. If not supplied, then a granularity dependent default is used.
pool_covs : typing.Optional
List of covariate definitions.
db_name : typing.Optional
Only accessible for internal use. Name of the database to use for storing the results.
priority : typing.Optional
Only accessible for internal use. Higher value indicate higher priority.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var actuals_filter : dict[str, typing.Any]
var covs_configuration : list[ActualsCovsConfiguration] | None
var covs_versions : list[str]
var db_name : str | None
var forecastingForecastingConfig
var matcher_report_id : int | None
var max_ts_len : int | None
var method_selectionMethodSelectionConfig | None
var model_config
var pool_covs : list[PoolCovDefinition] | None
var preprocessingPreprocessingConfig
var priority : int | None
var title : str
class TsCreationConfig (**data: Any)
Expand source code
class TsCreationConfig(BaseConfig):
    """Model for the time series creation configuration.

    Parameters
    ----------
    time_granularity: typing.Literal
        Target granularity of the time series.
    start_date: typing.Optional
        Dates before this date are excluded.
    end_date: typing.Optional
        Dates after this date are excluded.
    grouping_level: builtins.list
        Names of group columns that should be used as the grouping level.
    filter: builtins.list
        Settings for including or excluding values during time series creation.
    new_variables: builtins.list
        New value column that is a combination of two other value columns.
    value_columns_to_save: builtins.list
        Value columns that should be saved.
    missing_value_handler: typing.Literal
        Strategy how to handle missing values during time series creation.
    """
    time_granularity: Literal['yearly', 'quarterly', 'monthly', 'weekly', 'daily', 'hourly', 'halfhourly']
    start_date: Optional[str] = None
    end_date: Optional[str] = None
    grouping_level: list[str] = []
    filter: list[FilterSettings] = []
    new_variables: list[NewValue] = []
    value_columns_to_save: list[str]
    missing_value_handler: Literal['keepNaN', 'setToZero'] = 'keepNaN'

Model for the time series creation configuration.

Parameters

time_granularity : typing.Literal
Target granularity of the time series.
start_date : typing.Optional
Dates before this date are excluded.
end_date : typing.Optional
Dates after this date are excluded.
grouping_level : builtins.list
Names of group columns that should be used as the grouping level.
filter : builtins.list
Settings for including or excluding values during time series creation.
new_variables : builtins.list
New value column that is a combination of two other value columns.
value_columns_to_save : builtins.list
Value columns that should be saved.
missing_value_handler : typing.Literal
Strategy how to handle missing values during time series creation.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var end_date : str | None
var filter : list[FilterSettings]
var grouping_level : list[str]
var missing_value_handler : Literal['keepNaN', 'setToZero']
var model_config
var new_variables : list[NewValue]
var start_date : str | None
var time_granularity : Literal['yearly', 'quarterly', 'monthly', 'weekly', 'daily', 'hourly', 'halfhourly']
var value_columns_to_save : list[str]