Module futureexpert.expert_client
Client for connecting with future.
Classes
class ChainedReportIdentifier (**data: Any)-
Expand source code
class ChainedReportIdentifier(ReportIdentifier): """Extended report identifier with prerequisites. Parameters ---------- report_id: builtins.int settings_id: typing.Optional[builtins.int] prerequisites: builtins.list[futureexpert.expert_client.ReportIdentifier] """ prerequisites: list[ReportIdentifier] @classmethod def of(cls, final_report_identifier: ReportIdentifier, prerequisites: list[ReportIdentifier]) -> ChainedReportIdentifier: return cls(report_id=final_report_identifier.report_id, settings_id=final_report_identifier.settings_id, prerequisites=prerequisites)Extended report identifier with prerequisites.
Parameters
report_id:builtins.intsettings_id:typing.Optional[builtins.int]prerequisites:builtins.list[ReportIdentifier]
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- ReportIdentifier
- pydantic.main.BaseModel
Class variables
var model_configvar prerequisites : list[ReportIdentifier]
Static methods
def of(final_report_identifier: ReportIdentifier,
prerequisites: list[ReportIdentifier]) ‑> ChainedReportIdentifier
class ErrorReason (**data: Any)-
Expand source code
class ErrorReason(pydantic.BaseModel): """Details about a specific error in a report. Parameters ---------- status: builtins.str The status of the run ('Error' or 'NoEvaluation'). error_message: typing.Optional[builtins.str] The error message describing what went wrong. timeseries: builtins.list[builtins.str] List of time series names that encountered this error. """ status: str error_message: Optional[str] timeseries: list[str] @staticmethod def parse_error_reasons(customer_specific: dict[str, Any]) -> list[ErrorReason]: """Creates error reasons from raw customer_specific object. Parameters ---------- customer_specific: builtins.dict[builtins.str, typing.Any] return: builtins.list[futureexpert.expert_client.ErrorReason] """ log_messages = customer_specific.get('log_messages', None) assert log_messages is not None, 'missing log_messages property in customer_specific' assert isinstance(log_messages, list), 'unexpected type of log_messages' return [ErrorReason.model_validate(msg) for msg in log_messages]Details about a specific error in a report.
Parameters
status:builtins.str- The status of the run ('Error' or 'NoEvaluation').
error_message:typing.Optional[builtins.str]- The error message describing what went wrong.
timeseries:builtins.list[builtins.str]- List of time series names that encountered this error.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var error_message : str | Nonevar model_configvar status : strvar timeseries : list[str]
Static methods
def parse_error_reasons(customer_specific: dict[str, Any]) ‑> list[ErrorReason]-
Expand source code
@staticmethod def parse_error_reasons(customer_specific: dict[str, Any]) -> list[ErrorReason]: """Creates error reasons from raw customer_specific object. Parameters ---------- customer_specific: builtins.dict[builtins.str, typing.Any] return: builtins.list[futureexpert.expert_client.ErrorReason] """ log_messages = customer_specific.get('log_messages', None) assert log_messages is not None, 'missing log_messages property in customer_specific' assert isinstance(log_messages, list), 'unexpected type of log_messages' return [ErrorReason.model_validate(msg) for msg in log_messages]Creates error reasons from raw customer_specific object.
Parameters
customer_specific:builtins.dict[builtins.str, typing.Any]return:builtins.list[ErrorReason]
class ExpertClient (user: Optional[str] = None,
password: Optional[str] = None,
totp: Optional[str] = None,
refresh_token: Optional[str] = None,
group: Optional[str] = None,
environment: "Optional[Literal['production', 'staging', 'development']]" = None)-
Expand source code
class ExpertClient: """FutureEXPERT client.""" def __init__(self, user: Optional[str] = None, password: Optional[str] = None, totp: Optional[str] = None, refresh_token: Optional[str] = None, group: Optional[str] = None, environment: Optional[Literal['production', 'staging', 'development']] = None) -> None: """Initializer. Login using either your user credentials or a valid refresh token. Parameters ---------- user The username for the _future_ platform. If not provided, the username is read from environment variable FUTURE_USER. password The password for the _future_ platform. If not provided, the password is read from environment variable FUTURE_PW. totp Optional second factor for authentication using user credentials. refresh_token Alternative login using a refresh token only instead of user credentials. If not provided, the token is read from the environment variable FUTURE_REFRESH_TOKEN. You can retrieve a long-lived refresh token (offline token) from our identity provider using Open ID Connect scope `offline_access` at the token endpoint. Example: curl -s -X POST 'https://future-auth.prognostica.de/realms/future/protocol/openid-connect/token' \ -H 'Content-Type: application/x-www-form-urlencoded' \ --data-urlencode 'client_id=expert' \ --data-urlencode 'grant_type=password' \ --data-urlencode 'scope=openid offline_access' \ --data-urlencode "username=$FUTURE_USER" \ --data-urlencode "password=$FUTURE_PW" | jq .refresh_token group Optionally the name of the futureEXPERT group. Only relevant if the user has access to multiple groups. If not provided, the group is read from the environment variable FUTURE_GROUP. environment Optionally the _future_ environment to be used, defaults to production environment. If not provided, the environment is read from the environment variable FUTURE_ENVIRONMENT. """ future_env = cast(Literal['production', 'staging', 'development'], environment or os.getenv('FUTURE_ENVIRONMENT') or 'production') future_refresh_token = refresh_token or os.getenv('FUTURE_REFRESH_TOKEN') if future_refresh_token: self.api_client = FutureApiClient(refresh_token=future_refresh_token, environment=future_env) else: try: future_user = user or os.environ['FUTURE_USER'] except KeyError: raise MissingCredentialsError('username') from None try: future_password = password or os.environ['FUTURE_PW'] except KeyError: raise MissingCredentialsError('password') from None self.api_client = FutureApiClient(user=future_user, password=future_password, environment=future_env, totp=totp) authorized_groups = self.api_client.userinfo['groups'] future_group = group or os.getenv('FUTURE_GROUP') if future_group is None and len(authorized_groups) != 1: raise ValueError( f'You have access to multiple groups. Please select one of the following: {authorized_groups}') self.switch_group(new_group=future_group or authorized_groups[0], verbose=future_group is not None) self.is_analyst = 'analyst' in self.api_client.user_roles self.forecast_core_id = 'forecast-batch-internal' if self.is_analyst else 'forecast-batch' self.matcher_core_id = 'cov-selection-internal' if self.is_analyst else 'cov-selection' self.associator_core_id = 'associator-internal' if self.is_analyst else 'associator' self.hcfc_core_id = 'hcfc-internal' if self.is_analyst else 'hcfc' def __enter__(self) -> ExpertClient: return self def __exit__(self, exc_type: Optional[type[BaseException]], exc_value: Optional[BaseException], exc_tb: Any) -> None: """Cancel token refresh and logout if not a offline token is used. Parameters ---------- exc_type: typing.Optional[builtins.type[builtins.BaseException]] exc_value: typing.Optional[builtins.BaseException] exc_tb: typing.Any return: builtins.NoneType """ self.api_client.auto_refresh = False if 'offline_access' not in self.api_client.token['scope']: self.logout() @staticmethod def from_dotenv() -> ExpertClient: """Create an instance from a .env file or environment variables. Parameters ---------- return: futureexpert.expert_client.ExpertClient """ dotenv.load_dotenv() return ExpertClient() def logout(self) -> None: """Logout from futureEXPERT. If logged in with a refresh token. The refresh token is revoked. Parameters ---------- return: builtins.NoneType """ self.api_client.keycloak_openid.logout(self.api_client.token['refresh_token']) self.api_client.auto_refresh = False logger.info('Successfully logged out.') def switch_group(self, new_group: str, verbose: bool = True) -> None: """Switches the current group. Parameters ---------- new_group: builtins.str The name of the group to activate. verbose: builtins.bool If enabled, shows the group name in the log message. return: builtins.NoneType """ if new_group not in self.api_client.userinfo['groups']: raise RuntimeError(f'You are not authorized to access group {new_group}') self.group = new_group verbose_text = f' for group {self.group}' if verbose else '' logger.info(f'Successfully logged in{verbose_text}.') def upload_data(self, source: Union[pd.DataFrame, str], file_specification: Optional[FileSpecification] = None) -> Any: """Uploads the given raw data for further processing. Parameters ---------- source: typing.Union[pandas.core.frame.DataFrame, builtins.str] Path to a CSV file or a pandas data frame. file_specification: typing.Optional[futureexpert.checkin.FileSpecification] If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones. The parameter has no effect if source is a path to a CSV file. Returns ------- Identifier for the user Inputs. return: typing.Any """ df_file = None if isinstance(source, pd.DataFrame): if not file_specification: file_specification = FileSpecification() csv = source.to_csv(index=False, sep=file_specification.delimiter, decimal=file_specification.decimal, encoding='utf-8-sig') time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S') df_file = (f'expert-{time_stamp}.csv', csv) path = None else: path = source # TODO: currently only one file is supported here. upload_feedback = self.api_client.upload_user_inputs_for_group(self.group, path, df_file) return upload_feedback def check_data_definition(self, user_input_id: str, file_uuid: str, data_definition: DataDefinition, file_specification: FileSpecification = FileSpecification()) -> Any: """Checks the data definition. Removes specified rows and columns. Checks if column values have any issues. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: futureexpert.checkin.DataDefinition Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. return: typing.Any """ payload = self._create_checkin_payload_1( user_input_id, file_uuid, data_definition, file_specification) logger.info('Started data definition using CHECK-IN...') result = self.api_client.execute_action(group_id=self.group, core_id='checkin-preprocessing', payload=payload, interval_status_check_in_seconds=5) error_message = result['error'] if error_message != '': raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}') logger.info('Finished data definition.') return result def create_time_series(self, user_input_id: str, file_uuid: str, data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> Any: """Last step of the CHECK-IN process which creates the time series. Aggregates the data and saves them to the database. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: typing.Optional[futureexpert.checkin.DataDefinition] Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig] Configuration for the time series creation. config_checkin: typing.Optional[builtins.str] Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the _future_ frontend (now.future-forecasting.de). return: typing.Any """ logger.info('Transforming input data...') if config_ts_creation is None and config_checkin is None: raise ValueError('No configuration source is provided.') if config_ts_creation is not None and config_checkin is not None: raise ValueError('Only one configuration source can be processed.') if config_checkin is None and (data_definition is None or config_ts_creation is None): raise ValueError( 'For checkin configuration via python `data_defintion`and `config_ts_cration` must be provided.') if config_ts_creation is not None and data_definition is not None: payload_1 = self._create_checkin_payload_1( user_input_id, file_uuid, data_definition, file_specification) payload = self._create_checkin_payload_2(payload_1, config_ts_creation) if config_checkin is not None: payload = self._build_payload_from_ui_config( user_input_id=user_input_id, file_uuid=file_uuid, path=config_checkin) logger.info('Creating time series using CHECK-IN...') result = self.api_client.execute_action(group_id=self.group, core_id='checkin-preprocessing', payload=payload, interval_status_check_in_seconds=5) error_message = result['error'] if error_message != '': raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}') logger.info('Finished time series creation.') return result def check_in_pool_covs(self, requested_pool_covs: list[PoolCovDefinition], description: Optional[str] = None) -> CheckInPoolResult: """Create a new version from a list of pool covariates and version ids. Parameters ---------- requested_pool_covs: builtins.list[futureexpert.pool.PoolCovDefinition] List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id. If no version id is provided, the newest version of the covariate is used. description: typing.Optional[builtins.str] A short description of the selected covariates. Returns ------- Result object with fields version_id and pool_cov_information. return: futureexpert.pool.CheckInPoolResult """ logger.info('Transforming input data...') payload: dict[str, Any] = { 'payload': { 'requested_indicators': [ {**covariate.model_dump(exclude_none=True), 'indicator_id': covariate.pool_cov_id} for covariate in requested_pool_covs ] } } for covariate in payload['payload']['requested_indicators']: covariate.pop('pool_cov_id', None) payload['payload']['version_description'] = description logger.info('Creating time series using checkin-pool...') result = self.api_client.execute_action(group_id=self.group, core_id='checkin-pool', payload=payload, interval_status_check_in_seconds=5) logger.info('Finished time series creation.') return CheckInPoolResult(**result['result']) def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) -> PoolCovOverview: """Gets an overview of all covariates available on POOL according to the given filters. Parameters ---------- granularity: typing.Optional[builtins.str] If set, returns only data matching that granularity (Day or Month). search: typing.Optional[builtins.str] If set, performs a full-text search and only returns data found in that search. Returns ------- PoolCovOverview object with tables containing the covariates with different levels of detail . return: futureexpert.pool.PoolCovOverview """ response_json = self.api_client.get_pool_cov_overview(granularity=granularity, search=search) return PoolCovOverview(response_json) def get_time_series(self, version_id: str) -> CheckInResult: """Get time series data. From previously checked-in data. Parameters --------- version_id: builtins.str Id of the time series version. Returns ------- Id of the time series version. Used to identifiy the time series and the values of the time series. return: futureexpert.checkin.CheckInResult """ result = self.api_client.get_ts_data(self.group, version_id) return CheckInResult(time_series=[TimeSeries(**ts) for ts in result], version_id=version_id) def check_in_time_series(self, raw_data_source: Union[pd.DataFrame, str], data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> str: """Checks in time series data that can be used as actuals or covariate data. Parameters ---------- raw_data_source: typing.Union[pandas.core.frame.DataFrame, builtins.str] Data frame that contains the raw data or path to where the CSV file with the data is stored. data_definition: typing.Optional[futureexpert.checkin.DataDefinition] Specifies the data, value and group columns and which rows and columns are to be removed. config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig] Defines filter and aggreagtion level of the time series. config_checkin: typing.Optional[builtins.str] Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. Returns ------- Id of the time series version. Used to identifiy the time series. return: builtins.str """ upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification) user_input_id = upload_feedback['uuid'] file_id = upload_feedback['files'][0]['uuid'] response = self.create_time_series(user_input_id=user_input_id, file_uuid=file_id, data_definition=data_definition, config_ts_creation=config_ts_creation, config_checkin=config_checkin, file_specification=file_specification) return str(response['result']['tsVersion']) def _create_checkin_payload_1(self, user_input_id: str, file_uuid: str, data_definition: DataDefinition, file_specification: FileSpecification = FileSpecification()) -> Any: """Creates the payload for the CHECK-IN stage prepareDataset. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: futureexpert.checkin.DataDefinition Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Specify the format of the CSV file. Only relevant if a CSV was given as input. return: typing.Any """ return {'userInputId': user_input_id, 'payload': { 'stage': 'prepareDataset', 'fileUuid': file_uuid, 'meta': file_specification.model_dump(), 'performedTasks': { 'removedRows': data_definition.remove_rows, 'removedCols': data_definition.remove_columns }, 'columnDefinition': { 'dateColumns': [{snake_to_camel(key): value for key, value in data_definition.date_column.model_dump(exclude_none=True).items()}], 'valueColumns': [{snake_to_camel(key): value for key, value in d.model_dump(exclude_none=True).items()} for d in data_definition.value_columns], 'groupColumns': [{snake_to_camel(key): value for key, value in d.model_dump(exclude_none=True).items()} for d in data_definition.group_columns] } }} def _build_payload_from_ui_config(self, user_input_id: str, file_uuid: str, path: str) -> Any: """Creates the payload for the CHECK-IN stage createDataset. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. path: builtins.str Path to the JSON file. return: typing.Any """ with open(path) as file: file_data = file.read() json_data = json.loads(file_data) json_data['stage'] = 'createDataset' json_data['fileUuid'] = file_uuid del json_data["performedTasksLog"] return {'userInputId': user_input_id, 'payload': json_data} def _create_checkin_payload_2(self, payload: dict[str, Any], config: TsCreationConfig) -> Any: """Creates the payload for the CHECK-IN stage createDataset. Parameters ---------- payload: builtins.dict[builtins.str, typing.Any] Payload used in `create_checkin_payload_1`. config: futureexpert.checkin.TsCreationConfig Configuration for time series creation. return: typing.Any """ payload['payload']['rawDataReviewResults'] = {} payload['payload']['timeSeriesDatasetParameter'] = { 'aggregation': {'operator': 'sum', 'option': config.missing_value_handler}, 'date': { 'timeGranularity': config.time_granularity, 'startDate': config.start_date, 'endDate': config.end_date }, 'grouping': { 'dataLevel': config.grouping_level, 'saveHierarchy': config.save_hierarchy, 'filter': [d.model_dump() for d in config.filter] }, 'values': [{snake_to_camel(key): value for key, value in d.model_dump().items()} for d in config.new_variables], 'valueColumnsToSave': config.value_columns_to_save } payload['payload']['versionDescription'] = config.description payload['payload']['stage'] = 'createDataset' return payload def _create_reconciliation_payload(self, config: MakeForecastConsistentConfiguration) -> Any: """Creates the payload for forecast reconciliation. Parameters ---------- config: futureexpert.forecast_consistency.MakeForecastConsistentConfiguration Configuration of the make forecast consistent run. return: typing.Any """ config_dict = config.model_dump() return {'payload': config_dict} def _create_forecast_payload(self, version: str, config: ReportConfig) -> Any: """Creates the payload for the forecast. Parameters ---------- version: builtins.str Version of the time series that should get forecasts. config: futureexpert.forecast.ReportConfig Configuration of the forecast run. return: typing.Any """ config_dict = config.model_dump() config_dict['actuals_version'] = version config_dict['report_note'] = config_dict['title'] config_dict['cov_selection_report_id'] = config_dict['matcher_report_id'] config_dict['forecasting']['n_ahead'] = config_dict['forecasting']['fc_horizon'] config_dict['backtesting'] = config_dict['method_selection'] if config.rerun_report_id: config_dict['base_report_id'] = config.rerun_report_id config_dict['report_update_strategy'] = 'KEEP_OWN_RUNS' base_report_requested_run_status = ['Successful'] if 'NoEvaluation' not in config.rerun_status: base_report_requested_run_status.append('NoEvaluation') config_dict['base_report_requested_run_status'] = base_report_requested_run_status if config.pool_covs is not None: pool_covs_checkin_result = self.check_in_pool_covs(requested_pool_covs=config.pool_covs) cast(list[str], config_dict['covs_versions']).append(pool_covs_checkin_result.version_id) config_dict.pop('pool_covs') config_dict.pop('title') config_dict['forecasting'].pop('fc_horizon') config_dict.pop('matcher_report_id') config_dict.pop('method_selection') config_dict.pop('rerun_report_id') config_dict.pop('rerun_status') payload = {'payload': config_dict} return payload def start_associator(self, config: AssociatorConfig) -> ReportIdentifier: """Sarts an associator report. Parameters ---------- config: futureexpert.associator.AssociatorConfig Configuration of the associator run. Returns ------- The identifier of the associator report. return: futureexpert.expert_client.ReportIdentifier """ config_dict = config.model_dump() payload = {'payload': config_dict} result = self.api_client.execute_action(group_id=self.group, core_id=self.associator_core_id, payload=payload, interval_status_check_in_seconds=5, check_intermediate_result=True) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Associator is running...') return report def start_forecast(self, version: str, config: ReportConfig, reconciliation_config: Optional[ReconciliationConfig] = None) -> ReportIdentifier: """Starts a forecasting report. Parameters ---------- version: builtins.str ID of a time series version. config: futureexpert.forecast.ReportConfig Configuration of the forecasting report. reconciliation_config: futureexpert.forecast.ReportConfig Configuration to make forecasts consistent over hierarchical levels. Reconciliation assumes time series are measured in comparable units. Returns ------- The identifier of the forecasting report. reconciliation_config: typing.Optional[futureexpert.forecast_consistency.ReconciliationConfig] return: futureexpert.expert_client.ReportIdentifier """ if not self.is_analyst and (config.db_name is not None or config.priority is not None): raise ValueError('Only users with the role analyst are allowed to use the parameters db_name and priority.') if reconciliation_config is not None and reconciliation_config.enforce_forecast_minimum_constraint: raise ValueError('Minimum constraints for forecasts are only available via start_making_forecast_consistent.') version_data = self.api_client.get_ts_version(self.group, version) config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len, granularity=version_data['customer_specific']['granularity']) logger.info('Preparing data for forecast...') payload = self._create_forecast_payload(version, config) logger.info('Finished data preparation for forecast.') logger.info('Started creating forecasting report with FORECAST...') result = self.api_client.execute_action(group_id=self.group, core_id=self.forecast_core_id, payload=payload, interval_status_check_in_seconds=5) forecast_identifier = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {forecast_identifier.report_id}. Forecasts are running...') if reconciliation_config is None: return forecast_identifier # Continue with forecast reconciliation data_selection = MakeForecastConsistentDataSelection( version=version, fc_report_id=forecast_identifier.report_id) forecast_consistency_config = MakeForecastConsistentConfiguration(db_name=config.db_name, reconciliation=reconciliation_config, data_selection=data_selection, report_note=config.title) forecast_consistency_identifier = self.start_making_forecast_consistent(config=forecast_consistency_config) return ChainedReportIdentifier.of(final_report_identifier=forecast_consistency_identifier, prerequisites=[forecast_identifier]) def start_making_forecast_consistent(self, config: MakeForecastConsistentConfiguration) -> ReportIdentifier: """Starts process of making forecasts hierarchically consistent. Parameters ---------- config: futureexpert.forecast_consistency.MakeForecastConsistentConfiguration Configuration of the make forecast consistent run. Returns ------- The identifier of the forecasting report. return: futureexpert.expert_client.ReportIdentifier """ logger.info('Preparing data for forecast consistency...') if not self.is_analyst and (config.db_name is not None): raise ValueError('Only users with the role analyst are allowed to use the parameters db_name.') payload = self._create_reconciliation_payload(config) logger.info('Finished data preparation for forecast consistency.') logger.info('Started creating hierarchical reconciliation for consistent forecasts...') result = self.api_client.execute_action(group_id=self.group, core_id=self.hcfc_core_id, payload=payload, interval_status_check_in_seconds=5, check_intermediate_result=True) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Reconciliation is running...') return report def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) -> str: """Gets the available reports, ordered from newest to oldest. Parameters ---------- skip The number of initial elements of the report list to skip limit The limit on the length of the report list Returns ------- String representation of the type of one report. report_identifier: typing.Union[builtins.int, futureexpert.expert_client.ReportIdentifier] return: builtins.str """ report_id = report_identifier.report_id if isinstance( report_identifier, ReportIdentifier) else report_identifier return self.api_client.get_report_type(group_id=self.group, report_id=report_id) def get_reports(self, skip: int = 0, limit: int = 100) -> pd.DataFrame: """Gets the available reports, ordered from newest to oldest. Parameters ---------- skip: builtins.int The number of initial elements of the report list to skip: builtins.int limit: builtins.int The limit on the length of the report list Returns ------- The available reports from newest to oldest. return: pandas.core.frame.DataFrame """ group_reports = self.api_client.get_group_reports(group_id=self.group, skip=skip, limit=limit) vallidated_report_summarys = [ReportSummary.model_validate(report) for report in group_reports] return pd.DataFrame([report_summary.model_dump() for report_summary in vallidated_report_summarys]) def _get_single_report_status(self, report_identifier: ReportIdentifier, include_error_reason: bool = True) -> ReportStatus: """Gets the current status of a single report. Parameters ---------- id Report identifier. include_error_reason: builtins.bool Determines whether log messages are to be included in the result. Returns ------- The status of the report. report_identifier: futureexpert.expert_client.ReportIdentifier return: futureexpert.expert_client.ReportStatus """ raw_result = self.api_client.get_report_status(group_id=self.group, report_id=report_identifier.report_id, include_error_reason=include_error_reason) report_status = raw_result['status_summary'] created = report_status.get('Created', 0) successful = report_status.get('Successful', 0) noeval = report_status.get('NoEvaluation', 0) error = report_status.get('Error', 0) summary = ReportStatusProgress(requested=created, pending=created - successful - noeval - error, finished=successful + noeval + error) results = ReportStatusResults(successful=successful, no_evaluation=noeval, error=error) customer_specific = raw_result.get('customer_specific', None) assert (customer_specific is None or isinstance(customer_specific, dict)), 'unexpected type of customer_specific property' if include_error_reason: assert customer_specific is not None, 'missing customer_specific property in report status' error_reasons = ErrorReason.parse_error_reasons(customer_specific) else: error_reasons = None return ReportStatus(id=report_identifier, description=raw_result['description'], result_type=raw_result['result_type'], progress=summary, results=results, error_reasons=error_reasons) def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus: """Gets the current status of a report. If the provided report identifier includes prerequisites, the status of the prerequisites is included, too. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. include_error_reason: builtins.bool Determines whether log messages are to be included in the result. Returns ------- The status of the report. return: futureexpert.expert_client.ReportStatus """ identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None) final_status = self._get_single_report_status( report_identifier=identifier, include_error_reason=include_error_reason) if isinstance(identifier, ChainedReportIdentifier): for prerequisite_identifier in identifier.prerequisites: prerequisite_status = self.get_report_status(id=prerequisite_identifier, include_error_reason=include_error_reason) final_status.prerequisites.append(prerequisite_status) return final_status def get_fc_results(self, id: Union[ReportIdentifier, int], include_k_best_models: int = 1, include_backtesting: bool = False, include_discarded_models: bool = False) -> ForecastResults: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Forecast identifier or plain report ID. include_k_best_models: builtins.int Number of k best models for which results are to be returned. include_backtesting: builtins.bool Determines whether backtesting results are to be returned. include_discarded_models: builtins.bool Determines if models excluded from ranking should be included in the result. return: futureexpert.forecast.ForecastResults """ report_id = id.report_id if isinstance(id, ReportIdentifier) else id if self.get_report_type(report_identifier=report_id) not in ['forecast', 'MongoForecastingResultSink', 'hierarchical-forecast']: raise ValueError('The given report ID does not belong to a FORECAST result. ' + 'Please input a different ID or use another result getter function.') if include_k_best_models < 1: raise ValueError('At least one model is needed.') raw_forecast_results = self.api_client.get_fc_results(group_id=self.group, report_id=report_id, include_k_best_models=include_k_best_models, include_backtesting=include_backtesting, include_discarded_models=include_discarded_models) result = ForecastResults(forecast_results=[ForecastResult.model_validate(result) for result in raw_forecast_results['forecast_results']]) raw_forecast_consistency = raw_forecast_results['consistency'] if raw_forecast_consistency is not None: result.consistency = ConsistentForecastMetadata.model_validate(raw_forecast_consistency) return result def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> list[MatcherResult]: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. return: builtins.list[futureexpert.matcher.MatcherResult] """ if self.get_report_type(report_identifier=id) not in ['matcher', 'CovariateSelection']: raise ValueError('The given report ID does not belong to a MATCHER result. ' + 'Please input a different ID or use another result getter function.') report_id = id.report_id if isinstance(id, ReportIdentifier) else id results = self.api_client.get_matcher_results(group_id=self.group, report_id=report_id) return [MatcherResult(**result) for result in results] def get_associator_results(self, id: Union[ReportIdentifier, int]) -> AssociatorResult: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. return: builtins.list[futureexpert.matcher.MatcherResult] """ if self.get_report_type(report_identifier=id) != 'associator': raise ValueError('The given report ID does not belong to an ASSOCIATOR result. ' + 'Please input a different ID.') report_id = id.report_id if isinstance(id, ReportIdentifier) else id result: dict[str, Any] = self.api_client.get_associator_results(group_id=self.group, report_id=report_id) actuals_version = result.pop('actuals') result['input'] = self.api_client.get_ts_data(self.group, actuals_version) return AssociatorResult(**result) def get_ts_versions(self, skip: int = 0, limit: int = 100) -> pd.DataFrame: """Gets the available time series version, ordered from newest to oldest. keep_until_utc shows the last day where the data is stored. Parameters ---------- skip: builtins.int The number of initial elements of the version list to skip: builtins.int limit: builtins.int The limit on the length of the versjion list Returns ------- Overview of the available time series versions. return: pandas.core.frame.DataFrame """ results = self.api_client.get_group_ts_versions(self.group, skip, limit) transformed_results = [] for version in results: transformed_results.append(TimeSeriesVersion( version_id=version['_id'], description=version.get('description', None), creation_time_utc=version.get('creation_time_utc', None), keep_until_utc=version['customer_specific'].get('keep_until_utc', None) )) transformed_results.sort(key=lambda x: x.creation_time_utc, reverse=True) return pd.DataFrame([res.model_dump() for res in transformed_results]) def start_forecast_from_raw_data(self, raw_data_source: Union[pd.DataFrame, str], config_fc: ReportConfig, data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier: """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation. Parameters ---------- raw_data_source: typing.Union[pandas.core.frame.DataFrame, builtins.str] A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored. config_fc: futureexpert.forecast.ReportConfig The configuration of the forecast run. data_definition: typing.Optional[futureexpert.checkin.DataDefinition] Specifies the data, value and group columns and which rows and columns should be removed. config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig] Defines filter and aggreagtion level of the time series. config_checkin: typing.Optional[builtins.str] Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. Returns ------- The identifier of the forecasting report. return: futureexpert.expert_client.ReportIdentifier """ assert config_fc.rerun_report_id is None, 'start_forecast_from_raw_data can not be used with rerun_report_id.' upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification) user_input_id = upload_feedback['uuid'] file_id = upload_feedback['files'][0]['uuid'] res2 = self.create_time_series(user_input_id=user_input_id, file_uuid=file_id, data_definition=data_definition, config_ts_creation=config_ts_creation, config_checkin=config_checkin, file_specification=file_specification) version = res2['result']['tsVersion'] return self.start_forecast(version=version, config=config_fc) def start_matcher(self, config: MatcherConfig) -> ReportIdentifier: """Starts a covariate matcher report. Parameters ---------- version ID of a time series version config: futureexpert.matcher.MatcherConfig Configuration of the covariate matcher report. Returns ------- The identifier of the covariate matcher report. return: futureexpert.expert_client.ReportIdentifier """ version_data = self.api_client.get_ts_version(self.group, config.actuals_version) config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len, granularity=version_data['customer_specific']['granularity']) if not self.is_analyst and config.db_name is not None: raise ValueError('Only users with the role analyst are allowed to use the parameter db_name.') payload = self._create_matcher_payload(config) result = self.api_client.execute_action(group_id=self.group, core_id=self.matcher_core_id, payload=payload, interval_status_check_in_seconds=5) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Matching indicators...') return report def _create_matcher_payload(self, config: MatcherConfig) -> Any: """Converts the MatcherConfig into the payload needed for the cov-selection core. Parameters ---------- config: futureexpert.matcher.MatcherConfig return: typing.Any """ all_covs_versions = config.covs_versions if config.pool_covs is not None: pool_covs_checkin_result = self.check_in_pool_covs(requested_pool_covs=config.pool_covs) all_covs_versions.append(pool_covs_checkin_result.version_id) base_report_requested_run_status = ['Successful'] if 'NoEvaluation' not in config.rerun_status: base_report_requested_run_status.append('NoEvaluation') config_dict: dict[str, Any] = { 'report_description': config.title, 'db_name': config.db_name, 'data_config': { 'actuals_version': config.actuals_version, 'actuals_filter': config.actuals_filter, 'covs_versions': all_covs_versions, 'covs_filter': config.covs_filter, }, "compute_config": { "evaluation_start_date": config.evaluation_start_date, "evaluation_end_date": config.evaluation_end_date, 'max_ts_len': config.max_ts_len, "base_report_id": config.rerun_report_id, "base_report_requested_run_status": base_report_requested_run_status, "report_update_strategy": 'KEEP_OWN_RUNS', "cov_names": { 'cov_name_prefix': '', 'cov_name_field': 'name', 'cov_name_suffix': '', }, "preselection": { "num_obs_short_term_class": 36, "max_publication_lag": config.max_publication_lag, }, "postselection": { "num_obs_short_term_correlation": 60, "associator_report_id": config.associator_report_id, "use_clustering_results": config.use_clustering_results, "post_selection_queries": config.post_selection_queries, "post_selection_concatenation_operator": "&", "protected_selections_queries": [], "protected_selections_concatenation_operator": "&" }, "enable_leading_covariate_selection": config.enable_leading_covariate_selection, "fixed_season_length": config.fixed_season_length, "lag_selection": { "fixed_lags": config.lag_selection.fixed_lags, "min_lag": config.lag_selection.min_lag, "max_lag": config.lag_selection.max_lag, } } } return {'payload': config_dict}FutureEXPERT client.
Initializer.
Login using either your user credentials or a valid refresh token.
Parameters
user- The username for the future platform. If not provided, the username is read from environment variable FUTURE_USER.
password- The password for the future platform. If not provided, the password is read from environment variable FUTURE_PW.
totp- Optional second factor for authentication using user credentials.
refresh_token- Alternative login using a refresh token only instead of user credentials.
If not provided, the token is read from the environment variable FUTURE_REFRESH_TOKEN.
You can retrieve a long-lived refresh token (offline token) from our identity provider
using Open ID Connect scope
offline_accessat the token endpoint. Example: curl -s -X POST 'https://future-auth.prognostica.de/realms/future/protocol/openid-connect/token' -H 'Content-Type: application/x-www-form-urlencoded' –data-urlencode 'client_id=expert' –data-urlencode 'grant_type=password' –data-urlencode 'scope=openid offline_access' –data-urlencode "username=$FUTURE_USER" –data-urlencode "password=$FUTURE_PW" | jq .refresh_token group- Optionally the name of the futureEXPERT group. Only relevant if the user has access to multiple groups. If not provided, the group is read from the environment variable FUTURE_GROUP.
environment- Optionally the future environment to be used, defaults to production environment. If not provided, the environment is read from the environment variable FUTURE_ENVIRONMENT.
Static methods
def from_dotenv() ‑> ExpertClient-
Expand source code
@staticmethod def from_dotenv() -> ExpertClient: """Create an instance from a .env file or environment variables. Parameters ---------- return: futureexpert.expert_client.ExpertClient """ dotenv.load_dotenv() return ExpertClient()
Methods
def check_data_definition(self,
user_input_id: str,
file_uuid: str,
data_definition: DataDefinition,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any-
Expand source code
def check_data_definition(self, user_input_id: str, file_uuid: str, data_definition: DataDefinition, file_specification: FileSpecification = FileSpecification()) -> Any: """Checks the data definition. Removes specified rows and columns. Checks if column values have any issues. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: futureexpert.checkin.DataDefinition Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. return: typing.Any """ payload = self._create_checkin_payload_1( user_input_id, file_uuid, data_definition, file_specification) logger.info('Started data definition using CHECK-IN...') result = self.api_client.execute_action(group_id=self.group, core_id='checkin-preprocessing', payload=payload, interval_status_check_in_seconds=5) error_message = result['error'] if error_message != '': raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}') logger.info('Finished data definition.') return resultChecks the data definition.
Removes specified rows and columns. Checks if column values have any issues.
Parameters
user_input_id:builtins.str- UUID of the user input.
file_uuid:builtins.str- UUID of the file.
data_definition:DataDefinition- Specifies the data, value and group columns and which rows and columns are to be removed first.
file_specification:FileSpecification- Needed if a CSV is used with e.g. German format.
return:typing.Any
def check_in_pool_covs(self,
requested_pool_covs: list[PoolCovDefinition],
description: Optional[str] = None) ‑> CheckInPoolResult-
Expand source code
def check_in_pool_covs(self, requested_pool_covs: list[PoolCovDefinition], description: Optional[str] = None) -> CheckInPoolResult: """Create a new version from a list of pool covariates and version ids. Parameters ---------- requested_pool_covs: builtins.list[futureexpert.pool.PoolCovDefinition] List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id. If no version id is provided, the newest version of the covariate is used. description: typing.Optional[builtins.str] A short description of the selected covariates. Returns ------- Result object with fields version_id and pool_cov_information. return: futureexpert.pool.CheckInPoolResult """ logger.info('Transforming input data...') payload: dict[str, Any] = { 'payload': { 'requested_indicators': [ {**covariate.model_dump(exclude_none=True), 'indicator_id': covariate.pool_cov_id} for covariate in requested_pool_covs ] } } for covariate in payload['payload']['requested_indicators']: covariate.pop('pool_cov_id', None) payload['payload']['version_description'] = description logger.info('Creating time series using checkin-pool...') result = self.api_client.execute_action(group_id=self.group, core_id='checkin-pool', payload=payload, interval_status_check_in_seconds=5) logger.info('Finished time series creation.') return CheckInPoolResult(**result['result'])Create a new version from a list of pool covariates and version ids.
Parameters
requested_pool_covs:builtins.list[PoolCovDefinition]- List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id. If no version id is provided, the newest version of the covariate is used.
description:typing.Optional[builtins.str]- A short description of the selected covariates.
Returns
- Result object with fields version_id and pool_cov_information.
return:CheckInPoolResult
def check_in_time_series(self,
raw_data_source: Union[pd.DataFrame, str],
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> str-
Expand source code
def check_in_time_series(self, raw_data_source: Union[pd.DataFrame, str], data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> str: """Checks in time series data that can be used as actuals or covariate data. Parameters ---------- raw_data_source: typing.Union[pandas.core.frame.DataFrame, builtins.str] Data frame that contains the raw data or path to where the CSV file with the data is stored. data_definition: typing.Optional[futureexpert.checkin.DataDefinition] Specifies the data, value and group columns and which rows and columns are to be removed. config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig] Defines filter and aggreagtion level of the time series. config_checkin: typing.Optional[builtins.str] Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. Returns ------- Id of the time series version. Used to identifiy the time series. return: builtins.str """ upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification) user_input_id = upload_feedback['uuid'] file_id = upload_feedback['files'][0]['uuid'] response = self.create_time_series(user_input_id=user_input_id, file_uuid=file_id, data_definition=data_definition, config_ts_creation=config_ts_creation, config_checkin=config_checkin, file_specification=file_specification) return str(response['result']['tsVersion'])Checks in time series data that can be used as actuals or covariate data.
Parameters
raw_data_source:typing.Union[pandas.core.frame.DataFrame, builtins.str]- Data frame that contains the raw data or path to where the CSV file with the data is stored.
data_definition:typing.Optional[DataDefinition]- Specifies the data, value and group columns and which rows and columns are to be removed.
config_ts_creation:typing.Optional[TsCreationConfig]- Defines filter and aggreagtion level of the time series.
config_checkin:typing.Optional[builtins.str]- Path to the JSON file with the CHECK-IN configuration.
config_ts_creationandconfig_checkincannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification:FileSpecification- Needed if a CSV is used with e.g. German format.
Returns
- Id of the time series version. Used to identifiy the time series.
return:builtins.str
def create_time_series(self,
user_input_id: str,
file_uuid: str,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any-
Expand source code
def create_time_series(self, user_input_id: str, file_uuid: str, data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> Any: """Last step of the CHECK-IN process which creates the time series. Aggregates the data and saves them to the database. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: typing.Optional[futureexpert.checkin.DataDefinition] Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig] Configuration for the time series creation. config_checkin: typing.Optional[builtins.str] Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the _future_ frontend (now.future-forecasting.de). return: typing.Any """ logger.info('Transforming input data...') if config_ts_creation is None and config_checkin is None: raise ValueError('No configuration source is provided.') if config_ts_creation is not None and config_checkin is not None: raise ValueError('Only one configuration source can be processed.') if config_checkin is None and (data_definition is None or config_ts_creation is None): raise ValueError( 'For checkin configuration via python `data_defintion`and `config_ts_cration` must be provided.') if config_ts_creation is not None and data_definition is not None: payload_1 = self._create_checkin_payload_1( user_input_id, file_uuid, data_definition, file_specification) payload = self._create_checkin_payload_2(payload_1, config_ts_creation) if config_checkin is not None: payload = self._build_payload_from_ui_config( user_input_id=user_input_id, file_uuid=file_uuid, path=config_checkin) logger.info('Creating time series using CHECK-IN...') result = self.api_client.execute_action(group_id=self.group, core_id='checkin-preprocessing', payload=payload, interval_status_check_in_seconds=5) error_message = result['error'] if error_message != '': raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}') logger.info('Finished time series creation.') return resultLast step of the CHECK-IN process which creates the time series.
Aggregates the data and saves them to the database.
Parameters
user_input_id:builtins.str- UUID of the user input.
file_uuid:builtins.str- UUID of the file.
data_definition:typing.Optional[DataDefinition]- Specifies the data, value and group columns and which rows and columns are to be removed first.
file_specification:FileSpecification- Needed if a CSV is used with e.g. German format.
config_ts_creation:typing.Optional[TsCreationConfig]- Configuration for the time series creation.
config_checkin:typing.Optional[builtins.str]- Path to the JSON file with the CHECK-IN configuration.
config_ts_creationandconfig_checkincannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). return:typing.Any
def get_associator_results(self,
id: Union[ReportIdentifier, int]) ‑> AssociatorResult-
Expand source code
def get_associator_results(self, id: Union[ReportIdentifier, int]) -> AssociatorResult: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. return: builtins.list[futureexpert.matcher.MatcherResult] """ if self.get_report_type(report_identifier=id) != 'associator': raise ValueError('The given report ID does not belong to an ASSOCIATOR result. ' + 'Please input a different ID.') report_id = id.report_id if isinstance(id, ReportIdentifier) else id result: dict[str, Any] = self.api_client.get_associator_results(group_id=self.group, report_id=report_id) actuals_version = result.pop('actuals') result['input'] = self.api_client.get_ts_data(self.group, actuals_version) return AssociatorResult(**result)Gets the results from the given report.
Parameters
id:typing.Union[ReportIdentifier, builtins.int]- Report identifier or plain report ID.
return:builtins.list[MatcherResult]
def get_fc_results(self,
id: Union[ReportIdentifier, int],
include_k_best_models: int = 1,
include_backtesting: bool = False,
include_discarded_models: bool = False) ‑> ForecastResults-
Expand source code
def get_fc_results(self, id: Union[ReportIdentifier, int], include_k_best_models: int = 1, include_backtesting: bool = False, include_discarded_models: bool = False) -> ForecastResults: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Forecast identifier or plain report ID. include_k_best_models: builtins.int Number of k best models for which results are to be returned. include_backtesting: builtins.bool Determines whether backtesting results are to be returned. include_discarded_models: builtins.bool Determines if models excluded from ranking should be included in the result. return: futureexpert.forecast.ForecastResults """ report_id = id.report_id if isinstance(id, ReportIdentifier) else id if self.get_report_type(report_identifier=report_id) not in ['forecast', 'MongoForecastingResultSink', 'hierarchical-forecast']: raise ValueError('The given report ID does not belong to a FORECAST result. ' + 'Please input a different ID or use another result getter function.') if include_k_best_models < 1: raise ValueError('At least one model is needed.') raw_forecast_results = self.api_client.get_fc_results(group_id=self.group, report_id=report_id, include_k_best_models=include_k_best_models, include_backtesting=include_backtesting, include_discarded_models=include_discarded_models) result = ForecastResults(forecast_results=[ForecastResult.model_validate(result) for result in raw_forecast_results['forecast_results']]) raw_forecast_consistency = raw_forecast_results['consistency'] if raw_forecast_consistency is not None: result.consistency = ConsistentForecastMetadata.model_validate(raw_forecast_consistency) return resultGets the results from the given report.
Parameters
id:typing.Union[ReportIdentifier, builtins.int]- Forecast identifier or plain report ID.
include_k_best_models:builtins.int- Number of k best models for which results are to be returned.
include_backtesting:builtins.bool- Determines whether backtesting results are to be returned.
include_discarded_models:builtins.bool- Determines if models excluded from ranking should be included in the result.
return:ForecastResults
def get_matcher_results(self,
id: Union[ReportIdentifier, int]) ‑> list[MatcherResult]-
Expand source code
def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> list[MatcherResult]: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. return: builtins.list[futureexpert.matcher.MatcherResult] """ if self.get_report_type(report_identifier=id) not in ['matcher', 'CovariateSelection']: raise ValueError('The given report ID does not belong to a MATCHER result. ' + 'Please input a different ID or use another result getter function.') report_id = id.report_id if isinstance(id, ReportIdentifier) else id results = self.api_client.get_matcher_results(group_id=self.group, report_id=report_id) return [MatcherResult(**result) for result in results]Gets the results from the given report.
Parameters
id:typing.Union[ReportIdentifier, builtins.int]- Report identifier or plain report ID.
return:builtins.list[MatcherResult]
def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) ‑> PoolCovOverview-
Expand source code
def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) -> PoolCovOverview: """Gets an overview of all covariates available on POOL according to the given filters. Parameters ---------- granularity: typing.Optional[builtins.str] If set, returns only data matching that granularity (Day or Month). search: typing.Optional[builtins.str] If set, performs a full-text search and only returns data found in that search. Returns ------- PoolCovOverview object with tables containing the covariates with different levels of detail . return: futureexpert.pool.PoolCovOverview """ response_json = self.api_client.get_pool_cov_overview(granularity=granularity, search=search) return PoolCovOverview(response_json)Gets an overview of all covariates available on POOL according to the given filters.
Parameters
granularity:typing.Optional[builtins.str]- If set, returns only data matching that granularity (Day or Month).
search:typing.Optional[builtins.str]- If set, performs a full-text search and only returns data found in that search.
Returns
PoolCovOverview object with tables containing the covariates with- different levels of detail .
return:PoolCovOverview
def get_report_status(self,
id: Union[ReportIdentifier, int],
include_error_reason: bool = True) ‑> ReportStatus-
Expand source code
def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus: """Gets the current status of a report. If the provided report identifier includes prerequisites, the status of the prerequisites is included, too. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. include_error_reason: builtins.bool Determines whether log messages are to be included in the result. Returns ------- The status of the report. return: futureexpert.expert_client.ReportStatus """ identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None) final_status = self._get_single_report_status( report_identifier=identifier, include_error_reason=include_error_reason) if isinstance(identifier, ChainedReportIdentifier): for prerequisite_identifier in identifier.prerequisites: prerequisite_status = self.get_report_status(id=prerequisite_identifier, include_error_reason=include_error_reason) final_status.prerequisites.append(prerequisite_status) return final_statusGets the current status of a report.
If the provided report identifier includes prerequisites, the status of the prerequisites is included, too.
Parameters
id:typing.Union[ReportIdentifier, builtins.int]- Report identifier or plain report ID.
include_error_reason:builtins.bool- Determines whether log messages are to be included in the result.
Returns
- The status of the report.
return:ReportStatus
def get_report_type(self,
report_identifier: Union[int, ReportIdentifier]) ‑> str-
Expand source code
def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) -> str: """Gets the available reports, ordered from newest to oldest. Parameters ---------- skip The number of initial elements of the report list to skip limit The limit on the length of the report list Returns ------- String representation of the type of one report. report_identifier: typing.Union[builtins.int, futureexpert.expert_client.ReportIdentifier] return: builtins.str """ report_id = report_identifier.report_id if isinstance( report_identifier, ReportIdentifier) else report_identifier return self.api_client.get_report_type(group_id=self.group, report_id=report_id)Gets the available reports, ordered from newest to oldest.
Parameters ---------- skip The number of initial elements of the report list to skip limit The limit on the length of the report list Returns ------- String representation of the type of one report. report_identifier: typing.Union[builtins.int, futureexpert.expert_client.ReportIdentifier]return: builtins.str
def get_reports(self, skip: int = 0, limit: int = 100) ‑> pandas.core.frame.DataFrame-
Expand source code
def get_reports(self, skip: int = 0, limit: int = 100) -> pd.DataFrame: """Gets the available reports, ordered from newest to oldest. Parameters ---------- skip: builtins.int The number of initial elements of the report list to skip: builtins.int limit: builtins.int The limit on the length of the report list Returns ------- The available reports from newest to oldest. return: pandas.core.frame.DataFrame """ group_reports = self.api_client.get_group_reports(group_id=self.group, skip=skip, limit=limit) vallidated_report_summarys = [ReportSummary.model_validate(report) for report in group_reports] return pd.DataFrame([report_summary.model_dump() for report_summary in vallidated_report_summarys])Gets the available reports, ordered from newest to oldest.
Parameters
skip:builtins.int- The number of initial elements of the report list to skip: builtins.int
limit:builtins.int- The limit on the length of the report list
Returns
- The available reports from newest to oldest.
return:pandas.core.frame.DataFrame
def get_time_series(self, version_id: str) ‑> CheckInResult-
Expand source code
def get_time_series(self, version_id: str) -> CheckInResult: """Get time series data. From previously checked-in data. Parameters --------- version_id: builtins.str Id of the time series version. Returns ------- Id of the time series version. Used to identifiy the time series and the values of the time series. return: futureexpert.checkin.CheckInResult """ result = self.api_client.get_ts_data(self.group, version_id) return CheckInResult(time_series=[TimeSeries(**ts) for ts in result], version_id=version_id)Get time series data. From previously checked-in data.
Parameters
version_id:builtins.str- Id of the time series version.
Returns
- Id of the time series version. Used to identifiy the time series and the values of the time series.
return:CheckInResult
def get_ts_versions(self, skip: int = 0, limit: int = 100) ‑> pandas.core.frame.DataFrame-
Expand source code
def get_ts_versions(self, skip: int = 0, limit: int = 100) -> pd.DataFrame: """Gets the available time series version, ordered from newest to oldest. keep_until_utc shows the last day where the data is stored. Parameters ---------- skip: builtins.int The number of initial elements of the version list to skip: builtins.int limit: builtins.int The limit on the length of the versjion list Returns ------- Overview of the available time series versions. return: pandas.core.frame.DataFrame """ results = self.api_client.get_group_ts_versions(self.group, skip, limit) transformed_results = [] for version in results: transformed_results.append(TimeSeriesVersion( version_id=version['_id'], description=version.get('description', None), creation_time_utc=version.get('creation_time_utc', None), keep_until_utc=version['customer_specific'].get('keep_until_utc', None) )) transformed_results.sort(key=lambda x: x.creation_time_utc, reverse=True) return pd.DataFrame([res.model_dump() for res in transformed_results])Gets the available time series version, ordered from newest to oldest. keep_until_utc shows the last day where the data is stored.
Parameters
skip:builtins.int- The number of initial elements of the version list to skip: builtins.int
limit:builtins.int- The limit on the length of the versjion list
Returns
- Overview of the available time series versions.
return:pandas.core.frame.DataFrame
def logout(self) ‑> None-
Expand source code
def logout(self) -> None: """Logout from futureEXPERT. If logged in with a refresh token. The refresh token is revoked. Parameters ---------- return: builtins.NoneType """ self.api_client.keycloak_openid.logout(self.api_client.token['refresh_token']) self.api_client.auto_refresh = False logger.info('Successfully logged out.')Logout from futureEXPERT.
If logged in with a refresh token. The refresh token is revoked.Parameters
return:builtins.NoneType
def start_associator(self, config: AssociatorConfig) ‑> ReportIdentifier-
Expand source code
def start_associator(self, config: AssociatorConfig) -> ReportIdentifier: """Sarts an associator report. Parameters ---------- config: futureexpert.associator.AssociatorConfig Configuration of the associator run. Returns ------- The identifier of the associator report. return: futureexpert.expert_client.ReportIdentifier """ config_dict = config.model_dump() payload = {'payload': config_dict} result = self.api_client.execute_action(group_id=self.group, core_id=self.associator_core_id, payload=payload, interval_status_check_in_seconds=5, check_intermediate_result=True) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Associator is running...') return reportSarts an associator report.
Parameters
config:AssociatorConfig- Configuration of the associator run.
Returns
- The identifier of the associator report.
return:ReportIdentifier
def start_forecast(self,
version: str,
config: ReportConfig,
reconciliation_config: Optional[ReconciliationConfig] = None) ‑> ReportIdentifier-
Expand source code
def start_forecast(self, version: str, config: ReportConfig, reconciliation_config: Optional[ReconciliationConfig] = None) -> ReportIdentifier: """Starts a forecasting report. Parameters ---------- version: builtins.str ID of a time series version. config: futureexpert.forecast.ReportConfig Configuration of the forecasting report. reconciliation_config: futureexpert.forecast.ReportConfig Configuration to make forecasts consistent over hierarchical levels. Reconciliation assumes time series are measured in comparable units. Returns ------- The identifier of the forecasting report. reconciliation_config: typing.Optional[futureexpert.forecast_consistency.ReconciliationConfig] return: futureexpert.expert_client.ReportIdentifier """ if not self.is_analyst and (config.db_name is not None or config.priority is not None): raise ValueError('Only users with the role analyst are allowed to use the parameters db_name and priority.') if reconciliation_config is not None and reconciliation_config.enforce_forecast_minimum_constraint: raise ValueError('Minimum constraints for forecasts are only available via start_making_forecast_consistent.') version_data = self.api_client.get_ts_version(self.group, version) config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len, granularity=version_data['customer_specific']['granularity']) logger.info('Preparing data for forecast...') payload = self._create_forecast_payload(version, config) logger.info('Finished data preparation for forecast.') logger.info('Started creating forecasting report with FORECAST...') result = self.api_client.execute_action(group_id=self.group, core_id=self.forecast_core_id, payload=payload, interval_status_check_in_seconds=5) forecast_identifier = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {forecast_identifier.report_id}. Forecasts are running...') if reconciliation_config is None: return forecast_identifier # Continue with forecast reconciliation data_selection = MakeForecastConsistentDataSelection( version=version, fc_report_id=forecast_identifier.report_id) forecast_consistency_config = MakeForecastConsistentConfiguration(db_name=config.db_name, reconciliation=reconciliation_config, data_selection=data_selection, report_note=config.title) forecast_consistency_identifier = self.start_making_forecast_consistent(config=forecast_consistency_config) return ChainedReportIdentifier.of(final_report_identifier=forecast_consistency_identifier, prerequisites=[forecast_identifier])Starts a forecasting report.
Parameters ---------- version: builtins.str ID of a time series version. config: futureexpert.forecast.ReportConfig Configuration of the forecasting report. reconciliation_config: futureexpert.forecast.ReportConfig Configuration to make forecasts consistent over hierarchical levels. Reconciliation assumes time series are measured in comparable units. Returns ------- The identifier of the forecasting report. reconciliation_config: typing.Optional[futureexpert.forecast_consistency.ReconciliationConfig]return: futureexpert.expert_client.ReportIdentifier
def start_forecast_from_raw_data(self,
raw_data_source: Union[pd.DataFrame, str],
config_fc: ReportConfig,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> ReportIdentifier-
Expand source code
def start_forecast_from_raw_data(self, raw_data_source: Union[pd.DataFrame, str], config_fc: ReportConfig, data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier: """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation. Parameters ---------- raw_data_source: typing.Union[pandas.core.frame.DataFrame, builtins.str] A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored. config_fc: futureexpert.forecast.ReportConfig The configuration of the forecast run. data_definition: typing.Optional[futureexpert.checkin.DataDefinition] Specifies the data, value and group columns and which rows and columns should be removed. config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig] Defines filter and aggreagtion level of the time series. config_checkin: typing.Optional[builtins.str] Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. Returns ------- The identifier of the forecasting report. return: futureexpert.expert_client.ReportIdentifier """ assert config_fc.rerun_report_id is None, 'start_forecast_from_raw_data can not be used with rerun_report_id.' upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification) user_input_id = upload_feedback['uuid'] file_id = upload_feedback['files'][0]['uuid'] res2 = self.create_time_series(user_input_id=user_input_id, file_uuid=file_id, data_definition=data_definition, config_ts_creation=config_ts_creation, config_checkin=config_checkin, file_specification=file_specification) version = res2['result']['tsVersion'] return self.start_forecast(version=version, config=config_fc)Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.
Parameters
raw_data_source:typing.Union[pandas.core.frame.DataFrame, builtins.str]- A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
config_fc:ReportConfig- The configuration of the forecast run.
data_definition:typing.Optional[DataDefinition]- Specifies the data, value and group columns and which rows and columns should be removed.
config_ts_creation:typing.Optional[TsCreationConfig]- Defines filter and aggreagtion level of the time series.
config_checkin:typing.Optional[builtins.str]- Path to the JSON file with the CHECK-IN configuration.
config_ts_creationandconfig_checkincannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification:FileSpecification- Needed if a CSV is used with e.g. German format.
Returns
- The identifier of the forecasting report.
return:ReportIdentifier
def start_making_forecast_consistent(self, config: MakeForecastConsistentConfiguration) ‑> ReportIdentifier-
Expand source code
def start_making_forecast_consistent(self, config: MakeForecastConsistentConfiguration) -> ReportIdentifier: """Starts process of making forecasts hierarchically consistent. Parameters ---------- config: futureexpert.forecast_consistency.MakeForecastConsistentConfiguration Configuration of the make forecast consistent run. Returns ------- The identifier of the forecasting report. return: futureexpert.expert_client.ReportIdentifier """ logger.info('Preparing data for forecast consistency...') if not self.is_analyst and (config.db_name is not None): raise ValueError('Only users with the role analyst are allowed to use the parameters db_name.') payload = self._create_reconciliation_payload(config) logger.info('Finished data preparation for forecast consistency.') logger.info('Started creating hierarchical reconciliation for consistent forecasts...') result = self.api_client.execute_action(group_id=self.group, core_id=self.hcfc_core_id, payload=payload, interval_status_check_in_seconds=5, check_intermediate_result=True) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Reconciliation is running...') return reportStarts process of making forecasts hierarchically consistent.
Parameters
config:MakeForecastConsistentConfiguration- Configuration of the make forecast consistent run.
Returns
- The identifier of the forecasting report.
return:ReportIdentifier
def start_matcher(self, config: MatcherConfig) ‑> ReportIdentifier-
Expand source code
def start_matcher(self, config: MatcherConfig) -> ReportIdentifier: """Starts a covariate matcher report. Parameters ---------- version ID of a time series version config: futureexpert.matcher.MatcherConfig Configuration of the covariate matcher report. Returns ------- The identifier of the covariate matcher report. return: futureexpert.expert_client.ReportIdentifier """ version_data = self.api_client.get_ts_version(self.group, config.actuals_version) config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len, granularity=version_data['customer_specific']['granularity']) if not self.is_analyst and config.db_name is not None: raise ValueError('Only users with the role analyst are allowed to use the parameter db_name.') payload = self._create_matcher_payload(config) result = self.api_client.execute_action(group_id=self.group, core_id=self.matcher_core_id, payload=payload, interval_status_check_in_seconds=5) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Matching indicators...') return reportStarts a covariate matcher report.
Parameters
version- ID of a time series version
config:MatcherConfig- Configuration of the covariate matcher report.
Returns
- The identifier of the covariate matcher report.
return:ReportIdentifier
def switch_group(self, new_group: str, verbose: bool = True) ‑> None-
Expand source code
def switch_group(self, new_group: str, verbose: bool = True) -> None: """Switches the current group. Parameters ---------- new_group: builtins.str The name of the group to activate. verbose: builtins.bool If enabled, shows the group name in the log message. return: builtins.NoneType """ if new_group not in self.api_client.userinfo['groups']: raise RuntimeError(f'You are not authorized to access group {new_group}') self.group = new_group verbose_text = f' for group {self.group}' if verbose else '' logger.info(f'Successfully logged in{verbose_text}.')Switches the current group.
Parameters
new_group:builtins.str- The name of the group to activate.
verbose:builtins.bool- If enabled, shows the group name in the log message.
return:builtins.NoneType
def upload_data(self,
source: Union[pd.DataFrame, str],
file_specification: Optional[FileSpecification] = None) ‑> Any-
Expand source code
def upload_data(self, source: Union[pd.DataFrame, str], file_specification: Optional[FileSpecification] = None) -> Any: """Uploads the given raw data for further processing. Parameters ---------- source: typing.Union[pandas.core.frame.DataFrame, builtins.str] Path to a CSV file or a pandas data frame. file_specification: typing.Optional[futureexpert.checkin.FileSpecification] If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones. The parameter has no effect if source is a path to a CSV file. Returns ------- Identifier for the user Inputs. return: typing.Any """ df_file = None if isinstance(source, pd.DataFrame): if not file_specification: file_specification = FileSpecification() csv = source.to_csv(index=False, sep=file_specification.delimiter, decimal=file_specification.decimal, encoding='utf-8-sig') time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S') df_file = (f'expert-{time_stamp}.csv', csv) path = None else: path = source # TODO: currently only one file is supported here. upload_feedback = self.api_client.upload_user_inputs_for_group(self.group, path, df_file) return upload_feedbackUploads the given raw data for further processing.
Parameters
source:typing.Union[pandas.core.frame.DataFrame, builtins.str]- Path to a CSV file or a pandas data frame.
file_specification:typing.Optional[FileSpecification]- If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones. The parameter has no effect if source is a path to a CSV file.
Returns
- Identifier for the user Inputs.
return:typing.Any
class MissingCredentialsError (missing_credential_type: str)-
Expand source code
class MissingCredentialsError(RuntimeError): def __init__(self, missing_credential_type: str) -> None: super().__init__(f'Please enter {missing_credential_type} either when ' + 'initializing the expert client or in the the .env file!')Unspecified run-time error.
Ancestors
- builtins.RuntimeError
- builtins.Exception
- builtins.BaseException
class ReportIdentifier (**data: Any)-
Expand source code
class ReportIdentifier(pydantic.BaseModel): """Report ID and Settings ID of a report. Required to identify the report, e.g. when retrieving the results. Parameters ---------- report_id: builtins.int settings_id: typing.Optional[builtins.int] """ report_id: int settings_id: Optional[int]Report ID and Settings ID of a report. Required to identify the report, e.g. when retrieving the results.
Parameters
report_id:builtins.intsettings_id:typing.Optional[builtins.int]
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Subclasses
Class variables
var model_configvar report_id : intvar settings_id : int | None
class ReportStatus (**data: Any)-
Expand source code
class ReportStatus(pydantic.BaseModel): """Status of a forecast or matcher report. Parameters ---------- id: futureexpert.expert_client.ReportIdentifier The identifier of the report. description: builtins.str The description of the report. result_type: builtins.str The result type of the report. progress: futureexpert.expert_client.ReportStatusProgress Progress summary of the report. results: futureexpert.expert_client.ReportStatusResults Success/error summary of the report. error_reasons: typing.Optional[builtins.list[futureexpert.expert_client.ErrorReason]] Details about the errors of the report. Each error reason contains the status, error message, and list of affected time series. prerequisites: builtins.list[futureexpert.expert_client.ReportStatus] If the status was requested for a report that depends on other reports (ChainedReportIdentifier) all other report statuses are contained in the prerequisites in order to get an easy overview. """ id: ReportIdentifier description: str result_type: str progress: ReportStatusProgress results: ReportStatusResults error_reasons: Optional[list[ErrorReason]] = None prerequisites: list[ReportStatus] = pydantic.Field(default_factory=list) @property def is_finished(self) -> bool: """Indicates whether a forecasting report is finished.""" return self.progress.pending == 0 def print(self, print_prerequisites: bool = True, print_error_reasons: bool = True) -> None: """Prints a summary of the status. Parameters ---------- print_prerequisites: builtins.bool Enable or disable printing of prerequisite reports. print_error_reasons: builtins.bool Enable or disable printing of error reasons. return: builtins.NoneType """ title = f'Status of report "{self.description}" of type "{self.result_type}":' run_description = 'time series' if self.result_type in ['forecast', 'matcher'] else 'runs' if print_prerequisites: for prerequisite in self.prerequisites: prerequisite.print(print_error_reasons=print_error_reasons) if self.progress.requested == 0: print(f'{title}\n No {run_description} created') return pct_txt = f'{round(self.progress.finished/self.progress.requested*100)} % are finished' overall = f'{self.progress.requested} {run_description} requested for calculation' finished_txt = f'{self.progress.finished} {run_description} finished' noeval_txt = f'{self.results.no_evaluation} {run_description} without evaluation' error_txt = f'{self.results.error} {run_description} ran into an error' print(f'{title}\n {pct_txt} \n {overall} \n {finished_txt} \n {noeval_txt} \n {error_txt}') if print_error_reasons and self.error_reasons is not None and len(self.error_reasons) > 0: print('\nError reasons:') for error_reason in self.error_reasons: ts_count = len(error_reason.timeseries) ts_names = ', '.join(error_reason.timeseries[:3]) # Show first 3 time series if ts_count > 3: ts_names += f' ... and {ts_count - 3} more' print(f' [{error_reason.status}] {error_reason.error_message if error_reason.error_message else ""}') print(f' Affected time series ({ts_count}): {ts_names}')Status of a forecast or matcher report.
Parameters
id:ReportIdentifier- The identifier of the report.
description:builtins.str- The description of the report.
result_type:builtins.str- The result type of the report.
progress:ReportStatusProgress- Progress summary of the report.
results:ReportStatusResults- Success/error summary of the report.
error_reasons:typing.Optional[builtins.list[ErrorReason]]- Details about the errors of the report. Each error reason contains the status, error message, and list of affected time series.
prerequisites:builtins.list[ReportStatus]- If the status was requested for a report that depends on other reports (ChainedReportIdentifier) all other report statuses are contained in the prerequisites in order to get an easy overview.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var description : strvar error_reasons : list[ErrorReason] | Nonevar id : ReportIdentifiervar model_configvar prerequisites : list[ReportStatus]var progress : ReportStatusProgressvar result_type : strvar results : ReportStatusResults
Instance variables
prop is_finished : bool-
Expand source code
@property def is_finished(self) -> bool: """Indicates whether a forecasting report is finished.""" return self.progress.pending == 0Indicates whether a forecasting report is finished.
Methods
def print(self, print_prerequisites: bool = True, print_error_reasons: bool = True) ‑> None-
Expand source code
def print(self, print_prerequisites: bool = True, print_error_reasons: bool = True) -> None: """Prints a summary of the status. Parameters ---------- print_prerequisites: builtins.bool Enable or disable printing of prerequisite reports. print_error_reasons: builtins.bool Enable or disable printing of error reasons. return: builtins.NoneType """ title = f'Status of report "{self.description}" of type "{self.result_type}":' run_description = 'time series' if self.result_type in ['forecast', 'matcher'] else 'runs' if print_prerequisites: for prerequisite in self.prerequisites: prerequisite.print(print_error_reasons=print_error_reasons) if self.progress.requested == 0: print(f'{title}\n No {run_description} created') return pct_txt = f'{round(self.progress.finished/self.progress.requested*100)} % are finished' overall = f'{self.progress.requested} {run_description} requested for calculation' finished_txt = f'{self.progress.finished} {run_description} finished' noeval_txt = f'{self.results.no_evaluation} {run_description} without evaluation' error_txt = f'{self.results.error} {run_description} ran into an error' print(f'{title}\n {pct_txt} \n {overall} \n {finished_txt} \n {noeval_txt} \n {error_txt}') if print_error_reasons and self.error_reasons is not None and len(self.error_reasons) > 0: print('\nError reasons:') for error_reason in self.error_reasons: ts_count = len(error_reason.timeseries) ts_names = ', '.join(error_reason.timeseries[:3]) # Show first 3 time series if ts_count > 3: ts_names += f' ... and {ts_count - 3} more' print(f' [{error_reason.status}] {error_reason.error_message if error_reason.error_message else ""}') print(f' Affected time series ({ts_count}): {ts_names}')Prints a summary of the status.
Parameters
print_prerequisites:builtins.bool- Enable or disable printing of prerequisite reports.
print_error_reasons:builtins.bool- Enable or disable printing of error reasons.
return:builtins.NoneType
class ReportStatusProgress (**data: Any)-
Expand source code
class ReportStatusProgress(pydantic.BaseModel): """Progress of a forecasting report. Parameters ---------- requested: builtins.int pending: builtins.int finished: builtins.int """ requested: int pending: int finished: intProgress of a forecasting report.
Parameters
requested:builtins.intpending:builtins.intfinished:builtins.int
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var finished : intvar model_configvar pending : intvar requested : int
class ReportStatusResults (**data: Any)-
Expand source code
class ReportStatusResults(pydantic.BaseModel): """Result status of a forecasting report. This only includes runs that are already finished. Parameters ---------- successful: builtins.int no_evaluation: builtins.int error: builtins.int """ successful: int no_evaluation: int error: intResult status of a forecasting report.
This only includes runs that are already finished.
Parameters
successful:builtins.intno_evaluation:builtins.interror:builtins.int
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var error : intvar model_configvar no_evaluation : intvar successful : int
class ReportSummary (**data: Any)-
Expand source code
class ReportSummary(pydantic.BaseModel): """Report ID and description of a report. Parameters ---------- report_id: builtins.int description: builtins.str result_type: builtins.str """ report_id: int description: str result_type: strReport ID and description of a report.
Parameters
report_id:builtins.intdescription:builtins.strresult_type:builtins.str
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var description : strvar model_configvar report_id : intvar result_type : str