Module futureexpert.expert_client

Client for connecting with future.

Classes

class ExpertClient (user: Optional[str] = None,
password: Optional[str] = None,
totp: Optional[str] = None,
refresh_token: Optional[str] = None,
group: Optional[str] = None,
environment: "Optional[Literal['production', 'staging', 'development']]" = None)
Expand source code
class ExpertClient:
    """FutureEXPERT client."""

    def __init__(self,
                 user: Optional[str] = None,
                 password: Optional[str] = None,
                 totp: Optional[str] = None,
                 refresh_token: Optional[str] = None,
                 group: Optional[str] = None,
                 environment: Optional[Literal['production', 'staging', 'development']] = None) -> None:
        """Initializer.

        Login using either your user credentials or a valid refresh token.

        Parameters
        ----------
        user
            The username for the _future_ platform.
            If not provided, the username is read from environment variable FUTURE_USER.
        password
            The password for the _future_ platform.
            If not provided, the password is read from environment variable FUTURE_PW.
        totp
            Optional second factor for authentication using user credentials.
        refresh_token
            Alternative login using a refresh token only instead of user credentials.
            You can retrieve a long-lived refresh token (offline token) from our identity provider
            using Open ID Connect scope `offline_access` at the token endpoint. Example:
            curl -s -X POST 'https://future-auth.prognostica.de/realms/future/protocol/openid-connect/token' \
                    -H 'Content-Type: application/x-www-form-urlencoded' \
                    --data-urlencode 'client_id=expert' \
                    --data-urlencode 'grant_type=password' \
                    --data-urlencode 'scope=openid offline_access' \
                    --data-urlencode "username=$FUTURE_USER" \
                    --data-urlencode "password=$FUTURE_PW" | jq .refresh_token
        group
            Optionally the name of the futureEXPERT group. Only relevant if the user has access to multiple groups.
            If not provided, the group is read from the environment variable FUTURE_GROUP.
        environment
            Optionally the _future_ environment to be used, defaults to production environment.
            If not provided, the environment is read from the environment variable FUTURE_ENVIRONMENT.
        """
        future_env = cast(Literal['production', 'staging', 'development'],
                          environment or os.getenv('FUTURE_ENVIRONMENT') or 'production')
        if refresh_token:
            self.api_client = FutureApiClient(refresh_token=refresh_token, environment=future_env)
        else:
            try:
                future_user = user or os.environ['FUTURE_USER']
            except KeyError:
                raise MissingCredentialsError('username') from None
            try:
                future_password = password or os.environ['FUTURE_PW']
            except KeyError:
                raise MissingCredentialsError('password') from None

            self.api_client = FutureApiClient(user=future_user, password=future_password,
                                              environment=future_env, totp=totp)

        authorized_groups = self.api_client.userinfo['groups']
        future_group = group or os.getenv('FUTURE_GROUP')
        if future_group is None and len(authorized_groups) != 1:
            raise ValueError(
                f'You have access to multiple groups. Please select one of the following: {authorized_groups}')
        self.switch_group(new_group=future_group or authorized_groups[0],
                          verbose=future_group is not None)
        self.is_analyst = 'analyst' in self.api_client.user_roles
        self.forecast_core_id = 'forecast-batch-internal' if self.is_analyst else 'forecast-batch'
        self.matcher_core_id = 'cov-selection-internal' if self.is_analyst else 'cov-selection'
        self.associator_core_id = 'associator'
        self.hcfc_core_id = 'hcfc'

    @staticmethod
    def from_dotenv() -> ExpertClient:
        """Create an instance from a .env file or environment variables.

    Parameters
    ----------
    return: futureexpert.expert_client.ExpertClient

    """
        dotenv.load_dotenv()
        return ExpertClient()

    def switch_group(self, new_group: str, verbose: bool = True) -> None:
        """Switches the current group.

        Parameters
        ----------
        new_group: builtins.str
            The name of the group to activate.
        verbose: builtins.bool
            If enabled, shows the group name in the log message.
        return: builtins.NoneType

    """
        if new_group not in self.api_client.userinfo['groups']:
            raise RuntimeError(f'You are not authorized to access group {new_group}')
        self.group = new_group
        verbose_text = f' for group {self.group}' if verbose else ''
        logger.info(f'Successfully logged in{verbose_text}.')

    def upload_data(self, source: Union[pd.DataFrame, str], file_specification: Optional[FileSpecification] = None) -> Any:
        """Uploads the given raw data for further processing.

        Parameters
        ----------
        source: typing.Union[pandas.core.frame.DataFrame, builtins.str]
            Path to a CSV file or a pandas data frame.
        file_specification: typing.Optional[futureexpert.checkin.FileSpecification]
            If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones.
            The parameter has no effect if source is a path to a CSV file.

        Returns
        -------
        Identifier for the user Inputs.
        return: typing.Any

    """
        df_file = None
        if isinstance(source, pd.DataFrame):
            if not file_specification:
                file_specification = FileSpecification()
            csv = source.to_csv(index=False, sep=file_specification.delimiter,
                                decimal=file_specification.decimal, encoding='utf-8-sig')
            time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S')
            df_file = (f'expert-{time_stamp}.csv', csv)
            path = None
        else:
            path = source

        # TODO: currently only one file is supported here.
        upload_feedback = self.api_client.upload_user_inputs_for_group(self.group, path, df_file)

        return upload_feedback

    def check_data_definition(self,
                              user_input_id: str,
                              file_uuid: str,
                              data_definition: DataDefinition,
                              file_specification: FileSpecification = FileSpecification()) -> Any:
        """Checks the data definition.

        Removes specified rows and columns. Checks if column values have any issues.

        Parameters
        ----------
        user_input_id: builtins.str
            UUID of the user input.
        file_uuid: builtins.str
            UUID of the file.
        data_definition: futureexpert.checkin.DataDefinition
            Specifies the data, value and group columns and which rows and columns are to be removed first.
        file_specification: futureexpert.checkin.FileSpecification
            Needed if a CSV is used with e.g. German format.
        return: typing.Any

    """
        payload = self._create_checkin_payload_1(
            user_input_id, file_uuid, data_definition, file_specification)

        logger.info('Started data definition using CHECK-IN...')
        result = self.api_client.execute_action(group_id=self.group,
                                                core_id='checkin-preprocessing',
                                                payload=payload,
                                                interval_status_check_in_seconds=2)

        error_message = result['error']
        if error_message != '':
            raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}')

        logger.info('Finished data definition.')
        return result

    def create_time_series(self,
                           user_input_id: str,
                           file_uuid: str,
                           data_definition: Optional[DataDefinition] = None,
                           config_ts_creation: Optional[TsCreationConfig] = None,
                           config_checkin: Optional[str] = None,
                           file_specification: FileSpecification = FileSpecification()) -> Any:
        """Last step of the CHECK-IN process which creates the time series.

        Aggregates the data and saves them to the database.

        Parameters
        ----------
        user_input_id: builtins.str
            UUID of the user input.
        file_uuid: builtins.str
            UUID of the file.
        data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
            Specifies the data, value and group columns and which rows and columns are to be removed first.
        file_specification: futureexpert.checkin.FileSpecification
            Needed if a CSV is used with e.g. German format.
        config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
            Configuration for the time series creation.
        config_checkin: typing.Optional[builtins.str]
            Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
            cannot be set simultaneously. The configuration may be obtained from the last step of
            CHECK-IN using the _future_ frontend (now.future-forecasting.de).
        return: typing.Any

    """
        logger.info('Transforming input data...')

        if config_ts_creation is None and config_checkin is None:
            raise ValueError('No configuration source is provided.')

        if config_ts_creation is not None and config_checkin is not None:
            raise ValueError('Only one configuration source can be processed.')

        if config_checkin is None and (data_definition is None or config_ts_creation is None):
            raise ValueError(
                'For checkin configuration via python `data_defintion`and `config_ts_cration` must be provided.')

        if config_ts_creation is not None and data_definition is not None:
            payload_1 = self._create_checkin_payload_1(
                user_input_id, file_uuid, data_definition, file_specification)
            payload = self._create_checkin_payload_2(payload_1, config_ts_creation)
        if config_checkin is not None:
            payload = self._build_payload_from_ui_config(
                user_input_id=user_input_id, file_uuid=file_uuid, path=config_checkin)

        logger.info('Creating time series using CHECK-IN...')
        result = self.api_client.execute_action(group_id=self.group,
                                                core_id='checkin-preprocessing',
                                                payload=payload,
                                                interval_status_check_in_seconds=2)
        error_message = result['error']
        if error_message != '':
            raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}')

        logger.info('Finished time series creation.')

        return result

    def check_in_pool_covs(self,
                           requested_pool_covs: list[PoolCovDefinition],
                           description: Optional[str] = None) -> CheckInPoolResult:
        """Create a new version from a list of pool covariates and version ids.

        Parameters
        ----------
        requested_pool_covs: builtins.list[futureexpert.pool.PoolCovDefinition]
            List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id.
            If no version id is provided, the newest version of the covariate is used.
        description: typing.Optional[builtins.str]
            A short description of the selected covariates.

        Returns
        -------
        Result object with fields version_id and pool_cov_information.
        return: futureexpert.pool.CheckInPoolResult

    """
        logger.info('Transforming input data...')

        payload: dict[str, Any] = {
            'payload': {
                'requested_indicators': [
                    {**covariate.model_dump(exclude_none=True),
                     'indicator_id': covariate.pool_cov_id}
                    for covariate in requested_pool_covs
                ]
            }
        }
        for covariate in payload['payload']['requested_indicators']:
            covariate.pop('pool_cov_id', None)

        payload['payload']['version_description'] = description

        logger.info('Creating time series using checkin-pool...')
        result = self.api_client.execute_action(group_id=self.group,
                                                core_id='checkin-pool',
                                                payload=payload,
                                                interval_status_check_in_seconds=2)

        logger.info('Finished time series creation.')

        return CheckInPoolResult(**result['result'])

    def get_pool_cov_overview(self,
                              granularity: Optional[str] = None,
                              search: Optional[str] = None) -> PoolCovOverview:
        """Gets an overview of all covariates available on POOL according to the given filters.

        Parameters
        ----------
        granularity: typing.Optional[builtins.str]
            If set, returns only data matching that granularity (Day or Month).
        search: typing.Optional[builtins.str]
            If set, performs a full-text search and only returns data found in that search.

        Returns
        -------
        PoolCovOverview object with tables containing the covariates with
        different levels of detail .
        return: futureexpert.pool.PoolCovOverview

    """
        response_json = self.api_client.get_pool_cov_overview(granularity=granularity, search=search)
        return PoolCovOverview(response_json)

    def get_time_series(self,
                        version_id: str) -> CheckInResult:
        """Get time series data. From previously checked-in data.

        Parameters
        ---------
        version_id: builtins.str
            Id of the time series version.
        Returns
        -------
        Id of the time series version. Used to identifiy the time series and the values of the time series.
        return: futureexpert.checkin.CheckInResult

    """
        result = self.api_client.get_ts_data(self.group, version_id)
        return CheckInResult(time_series=[TimeSeries(**ts) for ts in result],
                             version_id=version_id)

    def check_in_time_series(self,
                             raw_data_source: Union[pd.DataFrame, str],
                             data_definition: Optional[DataDefinition] = None,
                             config_ts_creation: Optional[TsCreationConfig] = None,
                             config_checkin: Optional[str] = None,
                             file_specification: FileSpecification = FileSpecification()) -> str:
        """Checks in time series data that can be used as actuals or covariate data.

        Parameters
        ----------
        raw_data_source: typing.Union[pandas.core.frame.DataFrame, builtins.str]
            Data frame that contains the raw data or path to where the CSV file with the data is stored.
        data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
            Specifies the data, value and group columns and which rows and columns are to be removed.
        config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
            Defines filter and aggreagtion level of the time series.
        config_checkin: typing.Optional[builtins.str]
            Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
            cannot be set simultaneously. The configuration may be obtained from the last step of
            CHECK-IN using the future frontend (now.future-forecasting.de).
        file_specification: futureexpert.checkin.FileSpecification
            Needed if a CSV is used with e.g. German format.

        Returns
        -------
        Id of the time series version. Used to identifiy the time series.
        return: builtins.str

    """
        upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification)

        user_input_id = upload_feedback['uuid']
        file_id = upload_feedback['files'][0]['uuid']

        response = self.create_time_series(user_input_id=user_input_id,
                                           file_uuid=file_id,
                                           data_definition=data_definition,
                                           config_ts_creation=config_ts_creation,
                                           config_checkin=config_checkin,
                                           file_specification=file_specification)

        return str(response['result']['tsVersion'])

    def _create_checkin_payload_1(self, user_input_id: str,
                                  file_uuid: str,
                                  data_definition: DataDefinition,
                                  file_specification: FileSpecification = FileSpecification()) -> Any:
        """Creates the payload for the CHECK-IN stage prepareDataset.

        Parameters
        ----------
        user_input_id: builtins.str
            UUID of the user input.
        file_uuid: builtins.str
            UUID of the file.
        data_definition: futureexpert.checkin.DataDefinition
            Specifies the data, value and group columns and which rows and columns are to be removed first.
        file_specification: futureexpert.checkin.FileSpecification
            Specify the format of the CSV file. Only relevant if a CSV was given as input.
        return: typing.Any

    """

        return {'userInputId': user_input_id,
                'payload': {
                    'stage': 'prepareDataset',
                    'fileUuid': file_uuid,
                    'meta': file_specification.model_dump(),
                    'performedTasks': {
                        'removedRows': data_definition.remove_rows,
                        'removedCols': data_definition.remove_columns
                    },
                    'columnDefinition': {
                        'dateColumns': [{snake_to_camel(key): value for key, value in
                                        data_definition.date_column.model_dump(exclude_none=True).items()}],
                        'valueColumns': [{snake_to_camel(key): value for key, value in d.model_dump(exclude_none=True).items()}
                                         for d in data_definition.value_columns],
                        'groupColumns': [{snake_to_camel(key): value for key, value in d.model_dump(exclude_none=True).items()}
                                         for d in data_definition.group_columns]
                    }
                }}

    def _build_payload_from_ui_config(self, user_input_id: str, file_uuid: str, path: str) -> Any:
        """Creates the payload for the CHECK-IN stage createDataset.

        Parameters
        ----------
        user_input_id: builtins.str
            UUID of the user input.
        file_uuid: builtins.str
            UUID of the file.
        path: builtins.str
            Path to the JSON file.
        return: typing.Any

    """

        with open(path) as file:
            file_data = file.read()
            json_data = json.loads(file_data)

        json_data['stage'] = 'createDataset'
        json_data['fileUuid'] = file_uuid
        del json_data["performedTasksLog"]

        return {'userInputId': user_input_id,
                'payload': json_data}

    def _create_checkin_payload_2(self, payload: dict[str, Any], config: TsCreationConfig) -> Any:
        """Creates the payload for the CHECK-IN stage createDataset.

        Parameters
        ----------
        payload: builtins.dict[builtins.str, typing.Any]
            Payload used in `create_checkin_payload_1`.
        config: futureexpert.checkin.TsCreationConfig
            Configuration for time series creation.
        return: typing.Any

    """

        payload['payload']['rawDataReviewResults'] = {}
        payload['payload']['timeSeriesDatasetParameter'] = {
            'aggregation': {'operator': 'sum',
                            'option': config.missing_value_handler},
            'date': {
                'timeGranularity': config.time_granularity,
                'startDate': config.start_date,
                'endDate': config.end_date
            },
            'grouping': {
                'dataLevel': config.grouping_level,
                'saveHierarchy': config.save_hierarchy,
                'filter':  [d.model_dump() for d in config.filter]
            },
            'values': [{snake_to_camel(key): value for key, value in d.model_dump().items()} for d in config.new_variables],
            'valueColumnsToSave': config.value_columns_to_save
        }
        payload['payload']['versionDescription'] = config.description
        payload['payload']['stage'] = 'createDataset'

        return payload

    def _create_reconciliation_payload(self, config: MakeForecastConsistentConfiguration) -> Any:
        """Creates the payload for forecast reconciliation.

        Parameters
        ----------
        config: futureexpert.make_forecast_consistent.MakeForecastConsistentConfiguration
            Configuration of the make forecast consistent run.
        return: typing.Any

    """
        config_dict = config.model_dump()
        return {'payload': config_dict}

    def _create_forecast_payload(self, version: str, config: ReportConfig) -> Any:
        """Creates the payload for the forecast.

        Parameters
        ----------
        version: builtins.str
            Version of the time series that should get forecasts.
        config: futureexpert.forecast.ReportConfig
            Configuration of the forecast run.
        return: typing.Any

    """

        config_dict = config.model_dump()
        config_dict['actuals_version'] = version
        config_dict['report_note'] = config_dict['title']
        config_dict['cov_selection_report_id'] = config_dict['matcher_report_id']
        config_dict['forecasting']['n_ahead'] = config_dict['forecasting']['fc_horizon']
        config_dict['backtesting'] = config_dict['method_selection']

        if config.rerun_report_id:
            config_dict['base_report_id'] = config.rerun_report_id
            config_dict['report_update_strategy'] = 'KEEP_OWN_RUNS'

            base_report_requested_run_status = ['Successful']
            if 'NoEvaluation' not in config.rerun_status:
                base_report_requested_run_status.append('NoEvaluation')
            config_dict['base_report_requested_run_status'] = base_report_requested_run_status

        if config.pool_covs is not None:
            pool_covs_checkin_result = self.check_in_pool_covs(requested_pool_covs=config.pool_covs)
            cast(list[str], config_dict['covs_versions']).append(pool_covs_checkin_result.version_id)
        config_dict.pop('pool_covs')

        config_dict.pop('title')
        config_dict['forecasting'].pop('fc_horizon')
        config_dict.pop('matcher_report_id')
        config_dict.pop('method_selection')
        config_dict.pop('rerun_report_id')
        config_dict.pop('rerun_status')

        payload = {'payload': config_dict}

        return payload

    def start_associator(self, config: AssociatorConfig) -> ReportIdentifier:
        """Sarts an associator report.

        Parameters
        ----------
        config: futureexpert.associator.AssociatorConfig
            Configuration of the associator run.

        Returns
        -------
        The identifier of the associator report.
        return: futureexpert.expert_client.ReportIdentifier

    """

        config_dict = config.model_dump()
        payload = {'payload': config_dict}

        result = self.api_client.execute_action(group_id=self.group,
                                                core_id=self.associator_core_id,
                                                payload=payload,
                                                interval_status_check_in_seconds=2)

        report = ReportIdentifier.model_validate(result)
        logger.info(f'Report created with ID {report.report_id}. Associator finished')
        return report

    def start_forecast(self, version: str, config: ReportConfig) -> ReportIdentifier:
        """Starts a forecasting report.

        Parameters
        ----------
        version: builtins.str
            ID of a time series version.
        config: futureexpert.forecast.ReportConfig
            Configuration of the forecasting report.

        Returns
        -------
        The identifier of the forecasting report.
        return: futureexpert.expert_client.ReportIdentifier

    """

        version_data = self.api_client.get_ts_version(self.group, version)
        config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len,
                                                 granularity=version_data['customer_specific']['granularity'])

        logger.info('Preparing data for forecast...')
        if not self.is_analyst and (config.db_name is not None or config.priority is not None):
            raise ValueError('Only users with the role analyst are allowed to use the parameters db_name and priority.')
        payload = self._create_forecast_payload(version, config)
        logger.info('Finished data preparation for forecast.')

        logger.info('Started creating forecasting report with FORECAST...')
        result = self.api_client.execute_action(group_id=self.group,
                                                core_id=self.forecast_core_id,
                                                payload=payload,
                                                interval_status_check_in_seconds=2)

        report = ReportIdentifier.model_validate(result)
        logger.info(f'Report created with ID {report.report_id}. Forecasts are running...')
        return report

    def start_making_forecast_consistent(self, config: MakeForecastConsistentConfiguration) -> ReportIdentifier:
        """Starts process of making forecasts hierarchically consistent.

        Parameters
        ----------
        config: futureexpert.make_forecast_consistent.MakeForecastConsistentConfiguration
            Configuration of the make forecast consistent run.

        Returns
        -------
        The identifier of the forecasting report.
        return: futureexpert.expert_client.ReportIdentifier

    """

        logger.info('Preparing data for forecast consistency...')
        if not self.is_analyst and (config.db_name is not None):
            raise ValueError('Only users with the role analyst are allowed to use the parameters db_name.')
        payload = self._create_reconciliation_payload(config)
        logger.info('Finished data preparation for forecast consistency.')

        logger.info('Started creating hierarchical reconciliation for consistent forecasts...')
        result = self.api_client.execute_action(group_id=self.group,
                                                core_id=self.hcfc_core_id,
                                                payload=payload,
                                                interval_status_check_in_seconds=2)

        report = ReportIdentifier.model_validate(result)
        logger.info(f'Report created with ID {report.report_id}. Reconciliation is running...')
        return report

    def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) -> str:
        """Gets the available reports, ordered from newest to oldest.

        Parameters
        ----------
        skip
            The number of initial elements of the report list to skip
        limit
            The limit on the length of the report list

        Returns
        -------
            String representation of the type of one report.
        report_identifier: typing.Union[builtins.int, futureexpert.expert_client.ReportIdentifier]

    return: builtins.str

    """
        report_id = report_identifier.report_id if isinstance(
            report_identifier, ReportIdentifier) else report_identifier
        return self.api_client.get_report_type(group_id=self.group, report_id=report_id)

    def get_reports(self, skip: int = 0, limit: int = 100) -> pd.DataFrame:
        """Gets the available reports, ordered from newest to oldest.

        Parameters
        ----------
        skip: builtins.int
            The number of initial elements of the report list to skip: builtins.int
        limit: builtins.int
            The limit on the length of the report list

        Returns
        -------
        The available reports from newest to oldest.
        return: pandas.core.frame.DataFrame

    """
        group_reports = self.api_client.get_group_reports(group_id=self.group, skip=skip, limit=limit)
        vallidated_report_summarys = [ReportSummary.model_validate(report) for report in group_reports]
        return pd.DataFrame([report_summary.model_dump() for report_summary in vallidated_report_summarys])

    def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus:
        """Gets the current status of a forecast or matcher report.

        Parameters
        ----------
        id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int]
            Report identifier or plain report ID.
        include_error_reason: builtins.bool
            Determines whether log messages are to be included in the result.

        return: futureexpert.expert_client.ReportStatus

    """
        fc_identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None)
        raw_result = self.api_client.get_report_status(group_id=self.group,
                                                       report_id=fc_identifier.report_id,
                                                       include_error_reason=include_error_reason)

        report_status = raw_result['status_summary']
        created = report_status.get('Created', 0)
        successful = report_status.get('Successful', 0)
        noeval = report_status.get('NoEvaluation', 0)
        error = report_status.get('Error', 0)
        summary = ReportStatusProgress(requested=created,
                                       pending=created - successful - noeval - error,
                                       finished=successful + noeval + error)
        results = ReportStatusResults(successful=successful,
                                      no_evaluation=noeval,
                                      error=error)
        customer_specific = raw_result.get('customer_specific', None)
        assert (customer_specific is None
                or isinstance(customer_specific, dict)), 'unexpected type of customer_specific property'
        return ReportStatus(id=fc_identifier,
                            progress=summary,
                            results=results,
                            error_reasons=None if customer_specific is None else customer_specific.get('log_messages', None))

    def get_fc_results(self,
                       id: Union[ReportIdentifier, int],
                       include_k_best_models: int = 1,
                       include_backtesting: bool = False,
                       include_discarded_models: bool = False) -> list[ForecastResult]:
        """Gets the results from the given report.

        Parameters
        ----------
        id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int]
            Forecast identifier or plain report ID.
        include_k_best_models: builtins.int
            Number of k best models for which results are to be returned.
        include_backtesting: builtins.bool
            Determines whether backtesting results are to be returned.
        include_discarded_models: builtins.bool
            Determines if models excluded from ranking should be included in the result.
        return: builtins.list[futureexpert.forecast.ForecastResult]

    """

        if include_k_best_models < 1:
            raise ValueError('At least one model is needed.')

        if self.get_report_type(report_identifier=id) not in ['forecast', 'MongoForecastingResultSink']:
            raise ValueError('The given report ID does not belong to a FORECAST result. ' +
                             'Please input a different ID or use get_matcher_results().')

        report_id = id.report_id if isinstance(id, ReportIdentifier) else id

        results = self.api_client.get_fc_results(group_id=self.group,
                                                 report_id=report_id,
                                                 include_k_best_models=include_k_best_models,
                                                 include_backtesting=include_backtesting,
                                                 include_discarded_models=include_discarded_models)

        return [ForecastResult(**result) for result in results]

    def get_consistent_forecast_results(self,
                                        id: Union[ReportIdentifier, int]
                                        ) -> ConsistentForecastResult:
        """Gets the results from the given report.

        Parameters
        ----------
        id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int]
            Report identifier or plain report ID.
        return: futureexpert.make_forecast_consistent.ConsistentForecastResult

    """

        if self.get_report_type(report_identifier=id) != 'hierarchical-forecast':
            raise ValueError('The given report ID does not belong to a reconciled forecast result. ' +
                             'Please input a different ID.')
        report_id = id.report_id if isinstance(id, ReportIdentifier) else id
        results = self.api_client.get_hierarchical_fc_results(group_id=self.group,
                                                              report_id=report_id)
        return ConsistentForecastResult(**results)

    def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> list[MatcherResult]:
        """Gets the results from the given report.

        Parameters
        ----------
        id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int]
            Report identifier or plain report ID.
        return: futureexpert.make_forecast_consistent.ConsistentForecastResult

    """

        if self.get_report_type(report_identifier=id) not in ['matcher', 'CovariateSelection']:
            raise ValueError('The given report ID does not belong to a MATCHER result. ' +
                             'Please input a different ID or use get_fc_results().')

        report_id = id.report_id if isinstance(id, ReportIdentifier) else id

        results = self.api_client.get_matcher_results(group_id=self.group,
                                                      report_id=report_id)

        return [MatcherResult(**result) for result in results]

    def get_associator_results(self, id: Union[ReportIdentifier, int]) -> AssociatorResult:
        """Gets the results from the given report.

        Parameters
        ----------
        id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int]
            Report identifier or plain report ID.
        return: futureexpert.make_forecast_consistent.ConsistentForecastResult

    """

        if self.get_report_type(report_identifier=id) != 'associator':
            raise ValueError('The given report ID does not belong to an ASSOCIATOR result. ' +
                             'Please input a different ID.')

        report_id = id.report_id if isinstance(id, ReportIdentifier) else id

        result = self.api_client.get_associator_results(group_id=self.group,
                                                        report_id=report_id)

        associator_result = result[0]
        associator_result['input'] = self.api_client.get_ts_data(self.group, associator_result.get('actuals'))
        associator_result.pop('actuals')
        return AssociatorResult(**associator_result)

    def get_ts_versions(self, skip: int = 0, limit: int = 100) -> pd.DataFrame:
        """Gets the available time series version, ordered from newest to oldest.
            keep_until_utc shows the last day where the data is stored.

        Parameters
        ----------
        skip: builtins.int
            The number of initial elements of the version list to skip: builtins.int
        limit: builtins.int
            The limit on the length of the versjion list

        Returns
        -------
        Overview of the available time series versions.
        return: pandas.core.frame.DataFrame

    """
        results = self.api_client.get_group_ts_versions(self.group, skip, limit)
        transformed_results = []
        for version in results:
            transformed_results.append(TimeSeriesVersion(
                version_id=version['_id'],
                description=version.get('description', None),
                creation_time_utc=version.get('creation_time_utc', None),
                keep_until_utc=version['customer_specific'].get('keep_until_utc', None)
            ))
        transformed_results.sort(key=lambda x: x.creation_time_utc, reverse=True)

        return pd.DataFrame([res.model_dump() for res in transformed_results])

    def start_forecast_from_raw_data(self,
                                     raw_data_source: Union[pd.DataFrame, str],
                                     config_fc: ReportConfig,
                                     data_definition: Optional[DataDefinition] = None,
                                     config_ts_creation: Optional[TsCreationConfig] = None,
                                     config_checkin: Optional[str] = None,
                                     file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier:
        """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.

        Parameters
        ----------
        raw_data_source: typing.Union[pandas.core.frame.DataFrame, builtins.str]
            A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
        config_fc: futureexpert.forecast.ReportConfig
            The configuration of the forecast run.
        data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
            Specifies the data, value and group columns and which rows and columns should be removed.
        config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
            Defines filter and aggreagtion level of the time series.
        config_checkin: typing.Optional[builtins.str]
            Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
            cannot be set simultaneously. The configuration may be obtained from the last step of
            CHECK-IN using the future frontend (now.future-forecasting.de).
        file_specification: futureexpert.checkin.FileSpecification
            Needed if a CSV is used with e.g. German format.

        Returns
        -------
        The identifier of the forecasting report.
        return: futureexpert.expert_client.ReportIdentifier

    """

        assert config_fc.rerun_report_id is None, 'start_forecast_from_raw_data can not be used with rerun_report_id.'

        upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification)

        user_input_id = upload_feedback['uuid']
        file_id = upload_feedback['files'][0]['uuid']

        res2 = self.create_time_series(user_input_id=user_input_id,
                                       file_uuid=file_id,
                                       data_definition=data_definition,
                                       config_ts_creation=config_ts_creation,
                                       config_checkin=config_checkin,
                                       file_specification=file_specification)

        version = res2['result']['tsVersion']
        return self.start_forecast(version=version, config=config_fc)

    def start_matcher(self, config: MatcherConfig) -> ReportIdentifier:
        """Starts a covariate matcher report.

        Parameters
        ----------
        version
            ID of a time series version
        config: futureexpert.matcher.MatcherConfig
            Configuration of the covariate matcher report.

        Returns
        -------
        The identifier of the covariate matcher report.
        return: futureexpert.expert_client.ReportIdentifier

    """

        version_data = self.api_client.get_ts_version(self.group, config.actuals_version)
        config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len,
                                                 granularity=version_data['customer_specific']['granularity'])

        if not self.is_analyst and config.db_name is not None:
            raise ValueError('Only users with the role analyst are allowed to use the parameter db_name.')

        payload = self._create_matcher_payload(config)

        result = self.api_client.execute_action(group_id=self.group,
                                                core_id=self.matcher_core_id,
                                                payload=payload,
                                                interval_status_check_in_seconds=2)
        report = ReportIdentifier.model_validate(result)
        logger.info(f'Report created with ID {report.report_id}. Matching indicators...')
        return report

    def _create_matcher_payload(self, config: MatcherConfig) -> Any:
        """Converts the MatcherConfig into the payload needed for the cov-selection core.

    Parameters
    ----------
    config: futureexpert.matcher.MatcherConfig

    return: typing.Any

    """
        all_covs_versions = config.covs_versions
        if config.pool_covs is not None:
            pool_covs_checkin_result = self.check_in_pool_covs(requested_pool_covs=config.pool_covs)
            all_covs_versions.append(pool_covs_checkin_result.version_id)

        base_report_requested_run_status = ['Successful']
        if 'NoEvaluation' not in config.rerun_status:
            base_report_requested_run_status.append('NoEvaluation')

        config_dict: dict[str, Any] = {
            'report_description': config.title,
            'db_name': config.db_name,
            'data_config': {
                'actuals_version': config.actuals_version,
                'actuals_filter': config.actuals_filter,
                'covs_versions': all_covs_versions,
                'covs_filter': config.covs_filter,
            },
            "compute_config": {
                "evaluation_start_date": config.evaluation_start_date,
                "evaluation_end_date": config.evaluation_end_date,
                'max_ts_len': config.max_ts_len,
                "base_report_id": config.rerun_report_id,
                "base_report_requested_run_status": base_report_requested_run_status,
                "report_update_strategy": 'KEEP_OWN_RUNS',
                "cov_names": {
                    'cov_name_prefix': '',
                    'cov_name_field': 'name',
                    'cov_name_suffix': '',
                },
                "preselection": {
                    "num_obs_short_term_class": 36,
                    "max_publication_lag": config.max_publication_lag,
                },
                "postselection": {
                    "num_obs_short_term_correlation": 60,
                    "associator_report_id": config.associator_report_id,
                    "use_clustering_results": config.use_clustering_results,
                    "post_selection_queries": config.post_selection_queries,
                    "post_selection_concatenation_operator": "&",
                    "protected_selections_queries": [],
                    "protected_selections_concatenation_operator": "&"
                },
                "enable_leading_covariate_selection": config.enable_leading_covariate_selection,
                "fixed_season_length": config.fixed_season_length,
                "lag_selection": {
                    "fixed_lags": config.lag_selection.fixed_lags,
                    "min_lag": config.lag_selection.min_lag,
                    "max_lag": config.lag_selection.max_lag,
                }
            }
        }

        return {'payload': config_dict}

FutureEXPERT client.

Initializer.

Login using either your user credentials or a valid refresh token.

Parameters

user
The username for the future platform. If not provided, the username is read from environment variable FUTURE_USER.
password
The password for the future platform. If not provided, the password is read from environment variable FUTURE_PW.
totp
Optional second factor for authentication using user credentials.
refresh_token
Alternative login using a refresh token only instead of user credentials. You can retrieve a long-lived refresh token (offline token) from our identity provider using Open ID Connect scope offline_access at the token endpoint. Example: curl -s -X POST 'https://future-auth.prognostica.de/realms/future/protocol/openid-connect/token' -H 'Content-Type: application/x-www-form-urlencoded' –data-urlencode 'client_id=expert' –data-urlencode 'grant_type=password' –data-urlencode 'scope=openid offline_access' –data-urlencode "username=$FUTURE_USER" –data-urlencode "password=$FUTURE_PW" | jq .refresh_token
group
Optionally the name of the futureEXPERT group. Only relevant if the user has access to multiple groups. If not provided, the group is read from the environment variable FUTURE_GROUP.
environment
Optionally the future environment to be used, defaults to production environment. If not provided, the environment is read from the environment variable FUTURE_ENVIRONMENT.

Static methods

def from_dotenv() ‑> ExpertClient
Expand source code
@staticmethod
def from_dotenv() -> ExpertClient:
    """Create an instance from a .env file or environment variables.

Parameters
----------
return: futureexpert.expert_client.ExpertClient

"""
    dotenv.load_dotenv()
    return ExpertClient()

Create an instance from a .env file or environment variables.

Parameters

return : ExpertClient
 

Methods

def check_data_definition(self,
user_input_id: str,
file_uuid: str,
data_definition: DataDefinition,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any
Expand source code
def check_data_definition(self,
                          user_input_id: str,
                          file_uuid: str,
                          data_definition: DataDefinition,
                          file_specification: FileSpecification = FileSpecification()) -> Any:
    """Checks the data definition.

    Removes specified rows and columns. Checks if column values have any issues.

    Parameters
    ----------
    user_input_id: builtins.str
        UUID of the user input.
    file_uuid: builtins.str
        UUID of the file.
    data_definition: futureexpert.checkin.DataDefinition
        Specifies the data, value and group columns and which rows and columns are to be removed first.
    file_specification: futureexpert.checkin.FileSpecification
        Needed if a CSV is used with e.g. German format.
    return: typing.Any

"""
    payload = self._create_checkin_payload_1(
        user_input_id, file_uuid, data_definition, file_specification)

    logger.info('Started data definition using CHECK-IN...')
    result = self.api_client.execute_action(group_id=self.group,
                                            core_id='checkin-preprocessing',
                                            payload=payload,
                                            interval_status_check_in_seconds=2)

    error_message = result['error']
    if error_message != '':
        raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}')

    logger.info('Finished data definition.')
    return result

Checks the data definition.

Removes specified rows and columns. Checks if column values have any issues.

Parameters

user_input_id : builtins.str
UUID of the user input.
file_uuid : builtins.str
UUID of the file.
data_definition : DataDefinition
Specifies the data, value and group columns and which rows and columns are to be removed first.
file_specification : FileSpecification
Needed if a CSV is used with e.g. German format.
return : typing.Any
 
def check_in_pool_covs(self,
requested_pool_covs: list[PoolCovDefinition],
description: Optional[str] = None) ‑> CheckInPoolResult
Expand source code
def check_in_pool_covs(self,
                       requested_pool_covs: list[PoolCovDefinition],
                       description: Optional[str] = None) -> CheckInPoolResult:
    """Create a new version from a list of pool covariates and version ids.

    Parameters
    ----------
    requested_pool_covs: builtins.list[futureexpert.pool.PoolCovDefinition]
        List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id.
        If no version id is provided, the newest version of the covariate is used.
    description: typing.Optional[builtins.str]
        A short description of the selected covariates.

    Returns
    -------
    Result object with fields version_id and pool_cov_information.
    return: futureexpert.pool.CheckInPoolResult

"""
    logger.info('Transforming input data...')

    payload: dict[str, Any] = {
        'payload': {
            'requested_indicators': [
                {**covariate.model_dump(exclude_none=True),
                 'indicator_id': covariate.pool_cov_id}
                for covariate in requested_pool_covs
            ]
        }
    }
    for covariate in payload['payload']['requested_indicators']:
        covariate.pop('pool_cov_id', None)

    payload['payload']['version_description'] = description

    logger.info('Creating time series using checkin-pool...')
    result = self.api_client.execute_action(group_id=self.group,
                                            core_id='checkin-pool',
                                            payload=payload,
                                            interval_status_check_in_seconds=2)

    logger.info('Finished time series creation.')

    return CheckInPoolResult(**result['result'])

Create a new version from a list of pool covariates and version ids.

Parameters

requested_pool_covs : builtins.list[PoolCovDefinition]
List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id. If no version id is provided, the newest version of the covariate is used.
description : typing.Optional[builtins.str]
A short description of the selected covariates.

Returns

Result object with fields version_id and pool_cov_information.
return : CheckInPoolResult
 
def check_in_time_series(self,
raw_data_source: Union[pd.DataFrame, str],
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> str
Expand source code
def check_in_time_series(self,
                         raw_data_source: Union[pd.DataFrame, str],
                         data_definition: Optional[DataDefinition] = None,
                         config_ts_creation: Optional[TsCreationConfig] = None,
                         config_checkin: Optional[str] = None,
                         file_specification: FileSpecification = FileSpecification()) -> str:
    """Checks in time series data that can be used as actuals or covariate data.

    Parameters
    ----------
    raw_data_source: typing.Union[pandas.core.frame.DataFrame, builtins.str]
        Data frame that contains the raw data or path to where the CSV file with the data is stored.
    data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
        Specifies the data, value and group columns and which rows and columns are to be removed.
    config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
        Defines filter and aggreagtion level of the time series.
    config_checkin: typing.Optional[builtins.str]
        Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
        cannot be set simultaneously. The configuration may be obtained from the last step of
        CHECK-IN using the future frontend (now.future-forecasting.de).
    file_specification: futureexpert.checkin.FileSpecification
        Needed if a CSV is used with e.g. German format.

    Returns
    -------
    Id of the time series version. Used to identifiy the time series.
    return: builtins.str

"""
    upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification)

    user_input_id = upload_feedback['uuid']
    file_id = upload_feedback['files'][0]['uuid']

    response = self.create_time_series(user_input_id=user_input_id,
                                       file_uuid=file_id,
                                       data_definition=data_definition,
                                       config_ts_creation=config_ts_creation,
                                       config_checkin=config_checkin,
                                       file_specification=file_specification)

    return str(response['result']['tsVersion'])

Checks in time series data that can be used as actuals or covariate data.

Parameters

raw_data_source : typing.Union[pandas.core.frame.DataFrame, builtins.str]
Data frame that contains the raw data or path to where the CSV file with the data is stored.
data_definition : typing.Optional[DataDefinition]
Specifies the data, value and group columns and which rows and columns are to be removed.
config_ts_creation : typing.Optional[TsCreationConfig]
Defines filter and aggreagtion level of the time series.
config_checkin : typing.Optional[builtins.str]
Path to the JSON file with the CHECK-IN configuration. config_ts_creation and config_checkin cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de).
file_specification : FileSpecification
Needed if a CSV is used with e.g. German format.

Returns

Id of the time series version. Used to identifiy the time series.
return : builtins.str
 
def create_time_series(self,
user_input_id: str,
file_uuid: str,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any
Expand source code
def create_time_series(self,
                       user_input_id: str,
                       file_uuid: str,
                       data_definition: Optional[DataDefinition] = None,
                       config_ts_creation: Optional[TsCreationConfig] = None,
                       config_checkin: Optional[str] = None,
                       file_specification: FileSpecification = FileSpecification()) -> Any:
    """Last step of the CHECK-IN process which creates the time series.

    Aggregates the data and saves them to the database.

    Parameters
    ----------
    user_input_id: builtins.str
        UUID of the user input.
    file_uuid: builtins.str
        UUID of the file.
    data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
        Specifies the data, value and group columns and which rows and columns are to be removed first.
    file_specification: futureexpert.checkin.FileSpecification
        Needed if a CSV is used with e.g. German format.
    config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
        Configuration for the time series creation.
    config_checkin: typing.Optional[builtins.str]
        Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
        cannot be set simultaneously. The configuration may be obtained from the last step of
        CHECK-IN using the _future_ frontend (now.future-forecasting.de).
    return: typing.Any

"""
    logger.info('Transforming input data...')

    if config_ts_creation is None and config_checkin is None:
        raise ValueError('No configuration source is provided.')

    if config_ts_creation is not None and config_checkin is not None:
        raise ValueError('Only one configuration source can be processed.')

    if config_checkin is None and (data_definition is None or config_ts_creation is None):
        raise ValueError(
            'For checkin configuration via python `data_defintion`and `config_ts_cration` must be provided.')

    if config_ts_creation is not None and data_definition is not None:
        payload_1 = self._create_checkin_payload_1(
            user_input_id, file_uuid, data_definition, file_specification)
        payload = self._create_checkin_payload_2(payload_1, config_ts_creation)
    if config_checkin is not None:
        payload = self._build_payload_from_ui_config(
            user_input_id=user_input_id, file_uuid=file_uuid, path=config_checkin)

    logger.info('Creating time series using CHECK-IN...')
    result = self.api_client.execute_action(group_id=self.group,
                                            core_id='checkin-preprocessing',
                                            payload=payload,
                                            interval_status_check_in_seconds=2)
    error_message = result['error']
    if error_message != '':
        raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}')

    logger.info('Finished time series creation.')

    return result

Last step of the CHECK-IN process which creates the time series.

Aggregates the data and saves them to the database.

Parameters

user_input_id : builtins.str
UUID of the user input.
file_uuid : builtins.str
UUID of the file.
data_definition : typing.Optional[DataDefinition]
Specifies the data, value and group columns and which rows and columns are to be removed first.
file_specification : FileSpecification
Needed if a CSV is used with e.g. German format.
config_ts_creation : typing.Optional[TsCreationConfig]
Configuration for the time series creation.
config_checkin : typing.Optional[builtins.str]
Path to the JSON file with the CHECK-IN configuration. config_ts_creation and config_checkin cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de).
return : typing.Any
 
def get_associator_results(self,
id: Union[ReportIdentifier, int]) ‑> AssociatorResult
Expand source code
def get_associator_results(self, id: Union[ReportIdentifier, int]) -> AssociatorResult:
    """Gets the results from the given report.

    Parameters
    ----------
    id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int]
        Report identifier or plain report ID.
    return: futureexpert.make_forecast_consistent.ConsistentForecastResult

"""

    if self.get_report_type(report_identifier=id) != 'associator':
        raise ValueError('The given report ID does not belong to an ASSOCIATOR result. ' +
                         'Please input a different ID.')

    report_id = id.report_id if isinstance(id, ReportIdentifier) else id

    result = self.api_client.get_associator_results(group_id=self.group,
                                                    report_id=report_id)

    associator_result = result[0]
    associator_result['input'] = self.api_client.get_ts_data(self.group, associator_result.get('actuals'))
    associator_result.pop('actuals')
    return AssociatorResult(**associator_result)

Gets the results from the given report.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or plain report ID.
return : ConsistentForecastResult
 
def get_consistent_forecast_results(self,
id: Union[ReportIdentifier, int]) ‑> ConsistentForecastResult
Expand source code
def get_consistent_forecast_results(self,
                                    id: Union[ReportIdentifier, int]
                                    ) -> ConsistentForecastResult:
    """Gets the results from the given report.

    Parameters
    ----------
    id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int]
        Report identifier or plain report ID.
    return: futureexpert.make_forecast_consistent.ConsistentForecastResult

"""

    if self.get_report_type(report_identifier=id) != 'hierarchical-forecast':
        raise ValueError('The given report ID does not belong to a reconciled forecast result. ' +
                         'Please input a different ID.')
    report_id = id.report_id if isinstance(id, ReportIdentifier) else id
    results = self.api_client.get_hierarchical_fc_results(group_id=self.group,
                                                          report_id=report_id)
    return ConsistentForecastResult(**results)

Gets the results from the given report.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or plain report ID.
return : ConsistentForecastResult
 
def get_fc_results(self,
id: Union[ReportIdentifier, int],
include_k_best_models: int = 1,
include_backtesting: bool = False,
include_discarded_models: bool = False) ‑> list[ForecastResult]
Expand source code
def get_fc_results(self,
                   id: Union[ReportIdentifier, int],
                   include_k_best_models: int = 1,
                   include_backtesting: bool = False,
                   include_discarded_models: bool = False) -> list[ForecastResult]:
    """Gets the results from the given report.

    Parameters
    ----------
    id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int]
        Forecast identifier or plain report ID.
    include_k_best_models: builtins.int
        Number of k best models for which results are to be returned.
    include_backtesting: builtins.bool
        Determines whether backtesting results are to be returned.
    include_discarded_models: builtins.bool
        Determines if models excluded from ranking should be included in the result.
    return: builtins.list[futureexpert.forecast.ForecastResult]

"""

    if include_k_best_models < 1:
        raise ValueError('At least one model is needed.')

    if self.get_report_type(report_identifier=id) not in ['forecast', 'MongoForecastingResultSink']:
        raise ValueError('The given report ID does not belong to a FORECAST result. ' +
                         'Please input a different ID or use get_matcher_results().')

    report_id = id.report_id if isinstance(id, ReportIdentifier) else id

    results = self.api_client.get_fc_results(group_id=self.group,
                                             report_id=report_id,
                                             include_k_best_models=include_k_best_models,
                                             include_backtesting=include_backtesting,
                                             include_discarded_models=include_discarded_models)

    return [ForecastResult(**result) for result in results]

Gets the results from the given report.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Forecast identifier or plain report ID.
include_k_best_models : builtins.int
Number of k best models for which results are to be returned.
include_backtesting : builtins.bool
Determines whether backtesting results are to be returned.
include_discarded_models : builtins.bool
Determines if models excluded from ranking should be included in the result.
return : builtins.list[ForecastResult]
 
def get_matcher_results(self,
id: Union[ReportIdentifier, int]) ‑> list[MatcherResult]
Expand source code
def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> list[MatcherResult]:
    """Gets the results from the given report.

    Parameters
    ----------
    id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int]
        Report identifier or plain report ID.
    return: futureexpert.make_forecast_consistent.ConsistentForecastResult

"""

    if self.get_report_type(report_identifier=id) not in ['matcher', 'CovariateSelection']:
        raise ValueError('The given report ID does not belong to a MATCHER result. ' +
                         'Please input a different ID or use get_fc_results().')

    report_id = id.report_id if isinstance(id, ReportIdentifier) else id

    results = self.api_client.get_matcher_results(group_id=self.group,
                                                  report_id=report_id)

    return [MatcherResult(**result) for result in results]

Gets the results from the given report.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or plain report ID.
return : ConsistentForecastResult
 
def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) ‑> PoolCovOverview
Expand source code
def get_pool_cov_overview(self,
                          granularity: Optional[str] = None,
                          search: Optional[str] = None) -> PoolCovOverview:
    """Gets an overview of all covariates available on POOL according to the given filters.

    Parameters
    ----------
    granularity: typing.Optional[builtins.str]
        If set, returns only data matching that granularity (Day or Month).
    search: typing.Optional[builtins.str]
        If set, performs a full-text search and only returns data found in that search.

    Returns
    -------
    PoolCovOverview object with tables containing the covariates with
    different levels of detail .
    return: futureexpert.pool.PoolCovOverview

"""
    response_json = self.api_client.get_pool_cov_overview(granularity=granularity, search=search)
    return PoolCovOverview(response_json)

Gets an overview of all covariates available on POOL according to the given filters.

Parameters

granularity : typing.Optional[builtins.str]
If set, returns only data matching that granularity (Day or Month).
search : typing.Optional[builtins.str]
If set, performs a full-text search and only returns data found in that search.

Returns

PoolCovOverview object with tables containing the covariates with
 
different levels of detail .
return : PoolCovOverview
 
def get_report_status(self,
id: Union[ReportIdentifier, int],
include_error_reason: bool = True) ‑> ReportStatus
Expand source code
def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus:
    """Gets the current status of a forecast or matcher report.

    Parameters
    ----------
    id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int]
        Report identifier or plain report ID.
    include_error_reason: builtins.bool
        Determines whether log messages are to be included in the result.

    return: futureexpert.expert_client.ReportStatus

"""
    fc_identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None)
    raw_result = self.api_client.get_report_status(group_id=self.group,
                                                   report_id=fc_identifier.report_id,
                                                   include_error_reason=include_error_reason)

    report_status = raw_result['status_summary']
    created = report_status.get('Created', 0)
    successful = report_status.get('Successful', 0)
    noeval = report_status.get('NoEvaluation', 0)
    error = report_status.get('Error', 0)
    summary = ReportStatusProgress(requested=created,
                                   pending=created - successful - noeval - error,
                                   finished=successful + noeval + error)
    results = ReportStatusResults(successful=successful,
                                  no_evaluation=noeval,
                                  error=error)
    customer_specific = raw_result.get('customer_specific', None)
    assert (customer_specific is None
            or isinstance(customer_specific, dict)), 'unexpected type of customer_specific property'
    return ReportStatus(id=fc_identifier,
                        progress=summary,
                        results=results,
                        error_reasons=None if customer_specific is None else customer_specific.get('log_messages', None))

Gets the current status of a forecast or matcher report.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or plain report ID.
include_error_reason : builtins.bool
Determines whether log messages are to be included in the result.
return : ReportStatus
 
def get_report_type(self,
report_identifier: Union[int, ReportIdentifier]) ‑> str
Expand source code
def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) -> str:
    """Gets the available reports, ordered from newest to oldest.

    Parameters
    ----------
    skip
        The number of initial elements of the report list to skip
    limit
        The limit on the length of the report list

    Returns
    -------
        String representation of the type of one report.
    report_identifier: typing.Union[builtins.int, futureexpert.expert_client.ReportIdentifier]

return: builtins.str

"""
    report_id = report_identifier.report_id if isinstance(
        report_identifier, ReportIdentifier) else report_identifier
    return self.api_client.get_report_type(group_id=self.group, report_id=report_id)

Gets the available reports, ordered from newest to oldest.

Parameters
----------
skip
    The number of initial elements of the report list to skip
limit
    The limit on the length of the report list

Returns
-------
    String representation of the type of one report.
report_identifier: typing.Union[builtins.int, futureexpert.expert_client.ReportIdentifier]

return: builtins.str

def get_reports(self, skip: int = 0, limit: int = 100) ‑> pandas.core.frame.DataFrame
Expand source code
def get_reports(self, skip: int = 0, limit: int = 100) -> pd.DataFrame:
    """Gets the available reports, ordered from newest to oldest.

    Parameters
    ----------
    skip: builtins.int
        The number of initial elements of the report list to skip: builtins.int
    limit: builtins.int
        The limit on the length of the report list

    Returns
    -------
    The available reports from newest to oldest.
    return: pandas.core.frame.DataFrame

"""
    group_reports = self.api_client.get_group_reports(group_id=self.group, skip=skip, limit=limit)
    vallidated_report_summarys = [ReportSummary.model_validate(report) for report in group_reports]
    return pd.DataFrame([report_summary.model_dump() for report_summary in vallidated_report_summarys])

Gets the available reports, ordered from newest to oldest.

Parameters

skip : builtins.int
The number of initial elements of the report list to skip: builtins.int
limit : builtins.int
The limit on the length of the report list

Returns

The available reports from newest to oldest.
return : pandas.core.frame.DataFrame
 
def get_time_series(self, version_id: str) ‑> CheckInResult
Expand source code
def get_time_series(self,
                    version_id: str) -> CheckInResult:
    """Get time series data. From previously checked-in data.

    Parameters
    ---------
    version_id: builtins.str
        Id of the time series version.
    Returns
    -------
    Id of the time series version. Used to identifiy the time series and the values of the time series.
    return: futureexpert.checkin.CheckInResult

"""
    result = self.api_client.get_ts_data(self.group, version_id)
    return CheckInResult(time_series=[TimeSeries(**ts) for ts in result],
                         version_id=version_id)

Get time series data. From previously checked-in data.

Parameters

version_id : builtins.str
Id of the time series version.

Returns

Id of the time series version. Used to identifiy the time series and the values of the time series.
return : CheckInResult
 
def get_ts_versions(self, skip: int = 0, limit: int = 100) ‑> pandas.core.frame.DataFrame
Expand source code
def get_ts_versions(self, skip: int = 0, limit: int = 100) -> pd.DataFrame:
    """Gets the available time series version, ordered from newest to oldest.
        keep_until_utc shows the last day where the data is stored.

    Parameters
    ----------
    skip: builtins.int
        The number of initial elements of the version list to skip: builtins.int
    limit: builtins.int
        The limit on the length of the versjion list

    Returns
    -------
    Overview of the available time series versions.
    return: pandas.core.frame.DataFrame

"""
    results = self.api_client.get_group_ts_versions(self.group, skip, limit)
    transformed_results = []
    for version in results:
        transformed_results.append(TimeSeriesVersion(
            version_id=version['_id'],
            description=version.get('description', None),
            creation_time_utc=version.get('creation_time_utc', None),
            keep_until_utc=version['customer_specific'].get('keep_until_utc', None)
        ))
    transformed_results.sort(key=lambda x: x.creation_time_utc, reverse=True)

    return pd.DataFrame([res.model_dump() for res in transformed_results])

Gets the available time series version, ordered from newest to oldest. keep_until_utc shows the last day where the data is stored.

Parameters

skip : builtins.int
The number of initial elements of the version list to skip: builtins.int
limit : builtins.int
The limit on the length of the versjion list

Returns

Overview of the available time series versions.
return : pandas.core.frame.DataFrame
 
def start_associator(self, config: AssociatorConfig) ‑> ReportIdentifier
Expand source code
def start_associator(self, config: AssociatorConfig) -> ReportIdentifier:
    """Sarts an associator report.

    Parameters
    ----------
    config: futureexpert.associator.AssociatorConfig
        Configuration of the associator run.

    Returns
    -------
    The identifier of the associator report.
    return: futureexpert.expert_client.ReportIdentifier

"""

    config_dict = config.model_dump()
    payload = {'payload': config_dict}

    result = self.api_client.execute_action(group_id=self.group,
                                            core_id=self.associator_core_id,
                                            payload=payload,
                                            interval_status_check_in_seconds=2)

    report = ReportIdentifier.model_validate(result)
    logger.info(f'Report created with ID {report.report_id}. Associator finished')
    return report

Sarts an associator report.

Parameters

config : AssociatorConfig
Configuration of the associator run.

Returns

The identifier of the associator report.
return : ReportIdentifier
 
def start_forecast(self, version: str, config: ReportConfig) ‑> ReportIdentifier
Expand source code
def start_forecast(self, version: str, config: ReportConfig) -> ReportIdentifier:
    """Starts a forecasting report.

    Parameters
    ----------
    version: builtins.str
        ID of a time series version.
    config: futureexpert.forecast.ReportConfig
        Configuration of the forecasting report.

    Returns
    -------
    The identifier of the forecasting report.
    return: futureexpert.expert_client.ReportIdentifier

"""

    version_data = self.api_client.get_ts_version(self.group, version)
    config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len,
                                             granularity=version_data['customer_specific']['granularity'])

    logger.info('Preparing data for forecast...')
    if not self.is_analyst and (config.db_name is not None or config.priority is not None):
        raise ValueError('Only users with the role analyst are allowed to use the parameters db_name and priority.')
    payload = self._create_forecast_payload(version, config)
    logger.info('Finished data preparation for forecast.')

    logger.info('Started creating forecasting report with FORECAST...')
    result = self.api_client.execute_action(group_id=self.group,
                                            core_id=self.forecast_core_id,
                                            payload=payload,
                                            interval_status_check_in_seconds=2)

    report = ReportIdentifier.model_validate(result)
    logger.info(f'Report created with ID {report.report_id}. Forecasts are running...')
    return report

Starts a forecasting report.

Parameters

version : builtins.str
ID of a time series version.
config : ReportConfig
Configuration of the forecasting report.

Returns

The identifier of the forecasting report.
return : ReportIdentifier
 
def start_forecast_from_raw_data(self,
raw_data_source: Union[pd.DataFrame, str],
config_fc: ReportConfig,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> ReportIdentifier
Expand source code
def start_forecast_from_raw_data(self,
                                 raw_data_source: Union[pd.DataFrame, str],
                                 config_fc: ReportConfig,
                                 data_definition: Optional[DataDefinition] = None,
                                 config_ts_creation: Optional[TsCreationConfig] = None,
                                 config_checkin: Optional[str] = None,
                                 file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier:
    """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.

    Parameters
    ----------
    raw_data_source: typing.Union[pandas.core.frame.DataFrame, builtins.str]
        A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
    config_fc: futureexpert.forecast.ReportConfig
        The configuration of the forecast run.
    data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
        Specifies the data, value and group columns and which rows and columns should be removed.
    config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
        Defines filter and aggreagtion level of the time series.
    config_checkin: typing.Optional[builtins.str]
        Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
        cannot be set simultaneously. The configuration may be obtained from the last step of
        CHECK-IN using the future frontend (now.future-forecasting.de).
    file_specification: futureexpert.checkin.FileSpecification
        Needed if a CSV is used with e.g. German format.

    Returns
    -------
    The identifier of the forecasting report.
    return: futureexpert.expert_client.ReportIdentifier

"""

    assert config_fc.rerun_report_id is None, 'start_forecast_from_raw_data can not be used with rerun_report_id.'

    upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification)

    user_input_id = upload_feedback['uuid']
    file_id = upload_feedback['files'][0]['uuid']

    res2 = self.create_time_series(user_input_id=user_input_id,
                                   file_uuid=file_id,
                                   data_definition=data_definition,
                                   config_ts_creation=config_ts_creation,
                                   config_checkin=config_checkin,
                                   file_specification=file_specification)

    version = res2['result']['tsVersion']
    return self.start_forecast(version=version, config=config_fc)

Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.

Parameters

raw_data_source : typing.Union[pandas.core.frame.DataFrame, builtins.str]
A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
config_fc : ReportConfig
The configuration of the forecast run.
data_definition : typing.Optional[DataDefinition]
Specifies the data, value and group columns and which rows and columns should be removed.
config_ts_creation : typing.Optional[TsCreationConfig]
Defines filter and aggreagtion level of the time series.
config_checkin : typing.Optional[builtins.str]
Path to the JSON file with the CHECK-IN configuration. config_ts_creation and config_checkin cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de).
file_specification : FileSpecification
Needed if a CSV is used with e.g. German format.

Returns

The identifier of the forecasting report.
return : ReportIdentifier
 
def start_making_forecast_consistent(self, config: MakeForecastConsistentConfiguration) ‑> ReportIdentifier
Expand source code
def start_making_forecast_consistent(self, config: MakeForecastConsistentConfiguration) -> ReportIdentifier:
    """Starts process of making forecasts hierarchically consistent.

    Parameters
    ----------
    config: futureexpert.make_forecast_consistent.MakeForecastConsistentConfiguration
        Configuration of the make forecast consistent run.

    Returns
    -------
    The identifier of the forecasting report.
    return: futureexpert.expert_client.ReportIdentifier

"""

    logger.info('Preparing data for forecast consistency...')
    if not self.is_analyst and (config.db_name is not None):
        raise ValueError('Only users with the role analyst are allowed to use the parameters db_name.')
    payload = self._create_reconciliation_payload(config)
    logger.info('Finished data preparation for forecast consistency.')

    logger.info('Started creating hierarchical reconciliation for consistent forecasts...')
    result = self.api_client.execute_action(group_id=self.group,
                                            core_id=self.hcfc_core_id,
                                            payload=payload,
                                            interval_status_check_in_seconds=2)

    report = ReportIdentifier.model_validate(result)
    logger.info(f'Report created with ID {report.report_id}. Reconciliation is running...')
    return report

Starts process of making forecasts hierarchically consistent.

Parameters

config : MakeForecastConsistentConfiguration
Configuration of the make forecast consistent run.

Returns

The identifier of the forecasting report.
return : ReportIdentifier
 
def start_matcher(self, config: MatcherConfig) ‑> ReportIdentifier
Expand source code
def start_matcher(self, config: MatcherConfig) -> ReportIdentifier:
    """Starts a covariate matcher report.

    Parameters
    ----------
    version
        ID of a time series version
    config: futureexpert.matcher.MatcherConfig
        Configuration of the covariate matcher report.

    Returns
    -------
    The identifier of the covariate matcher report.
    return: futureexpert.expert_client.ReportIdentifier

"""

    version_data = self.api_client.get_ts_version(self.group, config.actuals_version)
    config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len,
                                             granularity=version_data['customer_specific']['granularity'])

    if not self.is_analyst and config.db_name is not None:
        raise ValueError('Only users with the role analyst are allowed to use the parameter db_name.')

    payload = self._create_matcher_payload(config)

    result = self.api_client.execute_action(group_id=self.group,
                                            core_id=self.matcher_core_id,
                                            payload=payload,
                                            interval_status_check_in_seconds=2)
    report = ReportIdentifier.model_validate(result)
    logger.info(f'Report created with ID {report.report_id}. Matching indicators...')
    return report

Starts a covariate matcher report.

Parameters

version
ID of a time series version
config : MatcherConfig
Configuration of the covariate matcher report.

Returns

The identifier of the covariate matcher report.
return : ReportIdentifier
 
def switch_group(self, new_group: str, verbose: bool = True) ‑> None
Expand source code
def switch_group(self, new_group: str, verbose: bool = True) -> None:
    """Switches the current group.

    Parameters
    ----------
    new_group: builtins.str
        The name of the group to activate.
    verbose: builtins.bool
        If enabled, shows the group name in the log message.
    return: builtins.NoneType

"""
    if new_group not in self.api_client.userinfo['groups']:
        raise RuntimeError(f'You are not authorized to access group {new_group}')
    self.group = new_group
    verbose_text = f' for group {self.group}' if verbose else ''
    logger.info(f'Successfully logged in{verbose_text}.')

Switches the current group.

Parameters

new_group : builtins.str
The name of the group to activate.
verbose : builtins.bool
If enabled, shows the group name in the log message.
return : builtins.NoneType
 
def upload_data(self,
source: Union[pd.DataFrame, str],
file_specification: Optional[FileSpecification] = None) ‑> Any
Expand source code
def upload_data(self, source: Union[pd.DataFrame, str], file_specification: Optional[FileSpecification] = None) -> Any:
    """Uploads the given raw data for further processing.

    Parameters
    ----------
    source: typing.Union[pandas.core.frame.DataFrame, builtins.str]
        Path to a CSV file or a pandas data frame.
    file_specification: typing.Optional[futureexpert.checkin.FileSpecification]
        If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones.
        The parameter has no effect if source is a path to a CSV file.

    Returns
    -------
    Identifier for the user Inputs.
    return: typing.Any

"""
    df_file = None
    if isinstance(source, pd.DataFrame):
        if not file_specification:
            file_specification = FileSpecification()
        csv = source.to_csv(index=False, sep=file_specification.delimiter,
                            decimal=file_specification.decimal, encoding='utf-8-sig')
        time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S')
        df_file = (f'expert-{time_stamp}.csv', csv)
        path = None
    else:
        path = source

    # TODO: currently only one file is supported here.
    upload_feedback = self.api_client.upload_user_inputs_for_group(self.group, path, df_file)

    return upload_feedback

Uploads the given raw data for further processing.

Parameters

source : typing.Union[pandas.core.frame.DataFrame, builtins.str]
Path to a CSV file or a pandas data frame.
file_specification : typing.Optional[FileSpecification]
If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones. The parameter has no effect if source is a path to a CSV file.

Returns

Identifier for the user Inputs.
return : typing.Any
 
class MissingCredentialsError (missing_credential_type: str)
Expand source code
class MissingCredentialsError(RuntimeError):
    def __init__(self, missing_credential_type: str) -> None:
        super().__init__(f'Please enter {missing_credential_type} either when ' +
                         'initializing the expert client or in the the .env file!')

Unspecified run-time error.

Ancestors

  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException
class ReportIdentifier (**data: Any)
Expand source code
class ReportIdentifier(pydantic.BaseModel):
    """Report ID and Settings ID of a report. Required to identify the report, e.g. when retrieving the results.

    Parameters
    ----------
    report_id: builtins.int

    settings_id: typing.Optional[builtins.int]

    """
    report_id: int
    settings_id: Optional[int]

Report ID and Settings ID of a report. Required to identify the report, e.g. when retrieving the results.

Parameters

report_id : builtins.int
 
settings_id : typing.Optional[builtins.int]
 

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var model_config
var report_id : int
var settings_id : int | None
class ReportStatus (**data: Any)
Expand source code
class ReportStatus(pydantic.BaseModel):
    """Status of a forecast or matcher report.

    Parameters
    ----------
    id: futureexpert.expert_client.ReportIdentifier

    progress: futureexpert.expert_client.ReportStatusProgress

    results: futureexpert.expert_client.ReportStatusResults

    error_reasons: typing.Optional[typing.Any]

    """
    id: ReportIdentifier
    progress: ReportStatusProgress
    results: ReportStatusResults
    error_reasons: Optional[Any] = None

    @property
    def is_finished(self) -> bool:
        """Indicates whether a forecasting report is finished."""
        return self.progress.pending == 0

    def print(self) -> None:
        """Prints a summary of the status.

    Parameters
    ----------
    return: builtins.NoneType

    """
        title = f'Status forecasting report for id: {self.id}'

        if self.progress.requested == 0:
            print(f'{title}\n  No run was created')
            return

        pct_txt = f'{round(self.progress.finished/self.progress.requested*100)} % are finished'
        overall = f'{self.progress.requested} time series requested for calculation'
        finished_txt = f'{self.progress.finished} time series finished'
        noeval_txt = f'{self.results.no_evaluation} time series without evaluation'
        error_txt = f'{self.results.error} time series ran into an error'
        print(f'{title}\n {pct_txt} \n {overall} \n {finished_txt} \n {noeval_txt} \n {error_txt}')

Status of a forecast or matcher report.

Parameters

id : ReportIdentifier
 
progress : ReportStatusProgress
 
results : ReportStatusResults
 
error_reasons : typing.Optional[typing.Any]
 

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var error_reasons : Any | None
var idReportIdentifier
var model_config
var progressReportStatusProgress
var resultsReportStatusResults

Instance variables

prop is_finished : bool
Expand source code
@property
def is_finished(self) -> bool:
    """Indicates whether a forecasting report is finished."""
    return self.progress.pending == 0

Indicates whether a forecasting report is finished.

Methods

def print(self) ‑> None
Expand source code
def print(self) -> None:
    """Prints a summary of the status.

Parameters
----------
return: builtins.NoneType

"""
    title = f'Status forecasting report for id: {self.id}'

    if self.progress.requested == 0:
        print(f'{title}\n  No run was created')
        return

    pct_txt = f'{round(self.progress.finished/self.progress.requested*100)} % are finished'
    overall = f'{self.progress.requested} time series requested for calculation'
    finished_txt = f'{self.progress.finished} time series finished'
    noeval_txt = f'{self.results.no_evaluation} time series without evaluation'
    error_txt = f'{self.results.error} time series ran into an error'
    print(f'{title}\n {pct_txt} \n {overall} \n {finished_txt} \n {noeval_txt} \n {error_txt}')

Prints a summary of the status.

Parameters

return : builtins.NoneType
 
class ReportStatusProgress (**data: Any)
Expand source code
class ReportStatusProgress(pydantic.BaseModel):
    """Progress of a forecasting report.

    Parameters
    ----------
    requested: builtins.int

    pending: builtins.int

    finished: builtins.int

    """
    requested: int
    pending: int
    finished: int

Progress of a forecasting report.

Parameters

requested : builtins.int
 
pending : builtins.int
 
finished : builtins.int
 

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var finished : int
var model_config
var pending : int
var requested : int
class ReportStatusResults (**data: Any)
Expand source code
class ReportStatusResults(pydantic.BaseModel):
    """Result status of a forecasting report.

    This only includes runs that are already finished.

    Parameters
    ----------
    successful: builtins.int

    no_evaluation: builtins.int

    error: builtins.int

    """
    successful: int
    no_evaluation: int
    error: int

Result status of a forecasting report.

This only includes runs that are already finished.

Parameters

successful : builtins.int
 
no_evaluation : builtins.int
 
error : builtins.int
 

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var error : int
var model_config
var no_evaluation : int
var successful : int
class ReportSummary (**data: Any)
Expand source code
class ReportSummary(pydantic.BaseModel):
    """Report ID and description of a report.

    Parameters
    ----------
    report_id: builtins.int

    description: builtins.str

    result_type: builtins.str

    """
    report_id: int
    description: str
    result_type: str

Report ID and description of a report.

Parameters

report_id : builtins.int
 
description : builtins.str
 
result_type : builtins.str
 

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var description : str
var model_config
var report_id : int
var result_type : str