Module futureexpert.expert_client
Client for connecting with future.
Classes
class ExpertClient (user: Optional[str] = None,
password: Optional[str] = None,
group: Optional[str] = None,
environment: "Optional[Literal['production', 'staging', 'development']]" = None)-
Expand source code
class ExpertClient: """FutureEXPERT client.""" def __init__(self, user: Optional[str] = None, password: Optional[str] = None, group: Optional[str] = None, environment: Optional[Literal['production', 'staging', 'development']] = None) -> None: """Initializer. Parameters ---------- user: typing.Optional The username for the _future_ platform. If not provided, the username is read from environment variable FUTURE_USER. password: typing.Optional The password for the _future_ platform. If not provided, the password is read from environment variable FUTURE_PW. group: typing.Optional Optionally the name of the futureEXPERT group. Only relevant if the user has access to multiple groups. If not provided, the group is read from the environment variable FUTURE_GROUP. environment: typing.Optional Optionally the _future_ environment to be used, defaults to production environment. If not provided, the environment is read from the environment variable FUTURE_ENVIRONMENT. """ try: future_user = user or os.environ['FUTURE_USER'] except KeyError: raise MissingCredentialsError('username') from None try: future_password = password or os.environ['FUTURE_PW'] except KeyError: raise MissingCredentialsError('password') from None future_group = group or os.getenv('FUTURE_GROUP') future_env = cast(Literal['production', 'staging', 'development'], environment or os.getenv('FUTURE_ENVIRONMENT') or 'production') self.client = FutureApiClient(user=future_user, password=future_password, environment=future_env) authorized_groups = self.client.userinfo['groups'] if future_group is None and len(authorized_groups) != 1: raise ValueError( f'You have access to multiple groups. Please select one of the following: {authorized_groups}') self.switch_group(new_group=future_group or authorized_groups[0], verbose=future_group is not None) self.is_analyst = 'analyst' in self.client.user_roles self.forecast_core_id = 'forecast-batch-internal' if self.is_analyst else 'forecast-batch' @staticmethod def from_dotenv() -> ExpertClient: """Create an instance from a .env file or environment variables.""" dotenv.load_dotenv() return ExpertClient() def switch_group(self, new_group: str, verbose: bool = True) -> None: """Switches the current group. Parameters ---------- new_group: builtins.str The name of the group to activate. verbose: builtins.bool If enabled, shows the group name in the log message. """ if new_group not in self.client.userinfo['groups']: raise RuntimeError(f'You are not authorized to access group {new_group}') self.group = new_group verbose_text = f' for group {self.group}' if verbose else '' logger.info(f'Successfully logged in{verbose_text}.') def upload_data(self, source: Union[pd.DataFrame, str], file_specification: Optional[FileSpecification] = None) -> Any: """Uploads the given raw data for further processing. Parameters ---------- source: typing.Union Path to a CSV file or a pandas data frame. file_specification: typing.Optional If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones. The parameter has no effect if source is a path to a CSV file. Returns ------- Identifier for the user Inputs. """ df_file = None if isinstance(source, pd.DataFrame): if not file_specification: file_specification = FileSpecification() csv = source.to_csv(index=False, sep=file_specification.delimiter, decimal=file_specification.decimal, encoding='utf-8-sig') time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S') df_file = (f'expert-{time_stamp}.csv', csv) path = None else: path = source # TODO: currently only one file is supported here. upload_feedback = self.client.upload_user_inputs_for_group( self.group, path, df_file) return upload_feedback def check_data_definition(self, user_input_id: str, file_uuid: str, data_definition: DataDefinition, file_specification: FileSpecification = FileSpecification()) -> Any: """Checks the data definition. Removes specified rows and columns. Checks if column values have any issues. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: futureexpert.checkin.DataDefinition Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. """ payload = self._create_checkin_payload_1( user_input_id, file_uuid, data_definition, file_specification) logger.info('Started data definition using CHECK-IN...') result = self.client.execute_action(group_id=self.group, core_id='checkin-preprocessing', payload=payload, interval_status_check_in_seconds=2) error_message = result['error'] if error_message != '': raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}') logger.info('Finished data definition.') return result def create_time_series(self, user_input_id: str, file_uuid: str, data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> Any: """Last step of the CHECK-IN process which creates the time series. Aggregates the data and saves them to the database. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: typing.Optional Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. config_ts_creation: typing.Optional Configuration for the time series creation. config_checkin: typing.Optional Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the _future_ frontend (now.future-forecasting.de). """ logger.info('Transforming input data...') if config_ts_creation is None and config_checkin is None: raise ValueError('No configuration source is provided.') if config_ts_creation is not None and config_checkin is not None: raise ValueError('Only one configuration source can be processed.') if config_checkin is None and (data_definition is None or config_ts_creation is None): raise ValueError( 'For checkin configuration via python `data_defintion`and `config_ts_cration` must be provided.') if config_ts_creation is not None and data_definition is not None: payload_1 = self._create_checkin_payload_1( user_input_id, file_uuid, data_definition, file_specification) payload = self._create_checkin_payload_2(payload_1, config_ts_creation) if config_checkin is not None: payload = self._build_payload_from_ui_config( user_input_id=user_input_id, file_uuid=file_uuid, path=config_checkin) logger.info('Creating time series using CHECK-IN...') result = self.client.execute_action(group_id=self.group, core_id='checkin-preprocessing', payload=payload, interval_status_check_in_seconds=2) error_message = result['error'] if error_message != '': raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}') logger.info('Finished time series creation.') return result def check_in_pool_covs(self, requested_pool_covs: list[PoolCovDefinition]) -> CheckInPoolResult: """Create a new version from a list of pool covariates and version ids. Parameters ---------- requested_pool_covs: builtins.list List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id. If no version id is provided, the newest version of the covariate is used. Returns ------- Result object with fields version_id and pool_cov_information. """ logger.info('Transforming input data...') payload = { 'payload': { 'requested_indicators': [ {**covariate.model_dump(exclude_none=True), 'indicator_id': covariate.pool_cov_id} for covariate in requested_pool_covs ] } } for covariate in payload['payload']['requested_indicators']: covariate.pop('pool_cov_id', None) logger.info('Creating time series using checkin-pool...') result = self.client.execute_action(group_id=self.group, core_id='checkin-pool', payload=payload, interval_status_check_in_seconds=2) logger.info('Finished time series creation.') return CheckInPoolResult(**result['result']) def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) -> PoolCovOverview: """Gets an overview of all covariates available on POOL according to the given filters. Parameters ---------- granularity: typing.Optional If set, returns only data matching that granularity (Day or Month). search: typing.Optional If set, performs a full-text search and only returns data found in that search. Returns ------- PoolCovOverview object with tables containing the covariates with different levels of detail . """ response_json = self.client.get_pool_cov_overview(granularity=granularity, search=search) return PoolCovOverview(response_json) def check_in_time_series(self, raw_data_source: Union[pd.DataFrame, str], data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> CheckInResult: """Checks in time series data that can be used as actuals or covariate data. Parameters ---------- raw_data_source: typing.Union Data frame that contains the raw data or path to where the CSV file with the data is stored. data_definition: typing.Optional Specifies the data, value and group columns and which rows and columns are to be removed. config_ts_creation: typing.Optional Defines filter and aggreagtion level of the time series. config_checkin: typing.Optional Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. Returns ------- Id of the time series version. Used to identifiy the time series and the values of the time series. """ upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification) user_input_id = upload_feedback['uuid'] file_id = upload_feedback['files'][0]['uuid'] response = self.create_time_series(user_input_id=user_input_id, file_uuid=file_id, data_definition=data_definition, config_ts_creation=config_ts_creation, config_checkin=config_checkin, file_specification=file_specification) result = [TimeSeries(**ts) for ts in response['result']['timeSeries']] return CheckInResult(time_series=result, version_id=response['result']['tsVersion']['_id']) def _create_checkin_payload_1(self, user_input_id: str, file_uuid: str, data_definition: DataDefinition, file_specification: FileSpecification = FileSpecification()) -> Any: """Creates the payload for the CHECK-IN stage prepareDataset. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: futureexpert.checkin.DataDefinition Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Specify the format of the CSV file. Only relevant if a CSV was given as input. """ return {'userInputId': user_input_id, 'payload': { 'stage': 'prepareDataset', 'fileUuid': file_uuid, 'meta': file_specification.model_dump(), 'performedTasks': { 'removedRows': data_definition.remove_rows, 'removedCols': data_definition.remove_columns }, 'columnDefinition': { 'dateColumns': [{snake_to_camel(key): value for key, value in data_definition.date_columns.model_dump(exclude_none=True).items()}], 'valueColumns': [{snake_to_camel(key): value for key, value in d.model_dump(exclude_none=True).items()} for d in data_definition.value_columns], 'groupColumns': [{snake_to_camel(key): value for key, value in d.model_dump(exclude_none=True).items()} for d in data_definition.group_columns] } }} def _build_payload_from_ui_config(self, user_input_id: str, file_uuid: str, path: str) -> Any: """Creates the payload for the CHECK-IN stage createDataset. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. path: builtins.str Path to the JSON file. """ with open(path) as file: file_data = file.read() json_data = json.loads(file_data) json_data['stage'] = 'createDataset' json_data['fileUuid'] = file_uuid del json_data["performedTasksLog"] return {'userInputId': user_input_id, 'payload': json_data} def _create_checkin_payload_2(self, payload: dict[str, Any], config: TsCreationConfig) -> Any: """Creates the payload for the CHECK-IN stage createDataset. Parameters ---------- payload: builtins.dict Payload used in `create_checkin_payload_1`. config: futureexpert.checkin.TsCreationConfig Configuration for time series creation. """ payload['payload']['rawDataReviewResults'] = {} payload['payload']['timeSeriesDatasetParameter'] = { 'aggregation': {'operator': 'sum', 'option': config.missing_value_handler}, 'date': { 'timeGranularity': config.time_granularity, 'startDate': config.start_date, 'endDate': config.end_date }, 'grouping': { 'dataLevel': config.grouping_level, 'filter': [d.model_dump() for d in config.filter] }, 'values': [{snake_to_camel(key): value for key, value in d.model_dump().items()} for d in config.new_variables], 'valueColumnsToSave': config.value_columns_to_save } payload['payload']['stage'] = 'createDataset' return payload def _create_forecast_payload(self, version: str, config: ReportConfig) -> Any: """Creates the payload for the forecast. Parameters ---------- version: builtins.str Version of the time series that should get forecasts. config: futureexpert.forecast.ReportConfig Configuration of the forecast run. """ config_dict = config.model_dump() config_dict['actuals_version'] = version config_dict['report_note'] = config_dict['title'] config_dict['cov_selection_report_id'] = config_dict['matcher_report_id'] config_dict['forecasting']['n_ahead'] = config_dict['forecasting']['fc_horizon'] config_dict['backtesting'] = config_dict['method_selection'] if config.pool_covs is not None: pool_covs_checkin_result = self.check_in_pool_covs(requested_pool_covs=config.pool_covs) cast(list[str], config_dict['covs_versions']).append(pool_covs_checkin_result.version_id) config_dict.pop('pool_covs') config_dict.pop('title') config_dict['forecasting'].pop('fc_horizon') config_dict.pop('matcher_report_id') config_dict.pop('method_selection') payload = {'payload': config_dict} return payload def start_forecast(self, version: str, config: ReportConfig) -> ReportIdentifier: """Starts a forecasting report. Parameters ---------- version: builtins.str ID of a time series version. config: futureexpert.forecast.ReportConfig Configuration of the forecasting report. Returns ------- The identifier of the forecasting report. """ version_data = self.client.get_ts_version(self.group, version) config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len, granularity=version_data['customer_specific']['granularity']) if config.method_selection: config.method_selection.forecasting_methods = remove_arima_if_not_allowed( granularity=version_data['customer_specific']['granularity'], methods=config.method_selection.forecasting_methods) if version_data['customer_specific']['granularity'] in ['weekly', 'daily', 'hourly', 'halfhourly'] \ and 'ARIMA' == config.method_selection.additional_cov_method: raise ValueError('ARIMA is not supported for granularities below monthly.') logger.info('Preparing data for forecast...') if not self.is_analyst and (config.db_name is not None or config.priority is not None): raise ValueError('Only users with the role analyst are allowed to use the parameters db_name and priority.') payload = self._create_forecast_payload(version, config) logger.info('Finished data preparation for forecast.') logger.info('Started creating forecasting report with FORECAST...') result = self.client.execute_action(group_id=self.group, core_id=self.forecast_core_id, payload=payload, interval_status_check_in_seconds=2) logger.info('Finished report creation. Forecasts are running...') return ReportIdentifier.model_validate(result) def get_report_type(self, report_identifier: int | ReportIdentifier) -> str: """Gets the available reports, ordered from newest to oldest. Parameters ---------- skip The number of initial elements of the report list to skip limit The limit on the length of the report list Returns ------- String representation of the type of one report. """ report_id = report_identifier.report_id if isinstance( report_identifier, ReportIdentifier) else report_identifier return self.client.get_report_type(group_id=self.group, report_id=report_id) def get_reports(self, skip: int = 0, limit: int = 100) -> list[ReportSummary]: """Gets the available reports, ordered from newest to oldest. Parameters ---------- skip: builtins.int The number of initial elements of the report list to skip: builtins.int limit: builtins.int The limit on the length of the report list Returns ------- The available reports from newest to oldest. """ group_reports = self.client.get_group_reports(group_id=self.group, skip=skip, limit=limit) return [ReportSummary.model_validate(report) for report in group_reports] def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus: """Gets the current status of a forecast or matcher report. Parameters ---------- id: typing.Union Report identifier or plain report ID. include_error_reason: builtins.bool Determines whether log messages are to be included in the result. """ fc_identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None) raw_result = self.client.get_report_status( group_id=self.group, report_id=fc_identifier.report_id, include_error_reason=include_error_reason) report_status = raw_result['status_summary'] created = report_status.get('Created', 0) successful = report_status.get('Successful', 0) noeval = report_status.get('NoEvaluation', 0) error = report_status.get('Error', 0) summary = ReportStatusProgress(requested=created, pending=created - successful - noeval - error, finished=successful + noeval + error) results = ReportStatusResults(successful=successful, no_evaluation=noeval, error=error) return ReportStatus(id=fc_identifier, progress=summary, results=results, error_reasons=raw_result.get('customer_specific', {}).get('log_messages', None)) def get_fc_results(self, id: Union[ReportIdentifier, int], include_k_best_models: int = 1, include_backtesting: bool = False) -> list[ForecastResult]: """Gets the results from the given report. Parameters ---------- id: typing.Union Forecast identifier or plain report ID. include_k_best_models: builtins.int Number of k best models for which results are to be returned. include_backtesting: builtins.bool Determines whether backtesting results are to be returned. """ if include_k_best_models < 1: raise ValueError('At least one model is needed.') if self.get_report_type(report_identifier=id) != 'MongoForecastingResultSink': raise ValueError('The given report ID does not belong to a FORECAST result. ' + 'Please input a different ID or use get_matcher_results().') report_id = id.report_id if isinstance(id, ReportIdentifier) else id results = self.client.get_fc_results(group_id=self.group, report_id=report_id, include_k_best_models=include_k_best_models, include_backtesting=include_backtesting) return [ForecastResult(**result) for result in results] def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> list[MatcherResult]: """Gets the results from the given report. Parameters ---------- id: typing.Union Report identifier or plain report ID. """ if self.get_report_type(report_identifier=id) != 'CovariateSelection': raise ValueError('The given report ID does not belong to a MATCHER result. ' + 'Please input a different ID or use get_fc_results().') report_id = id.report_id if isinstance(id, ReportIdentifier) else id results = self.client.get_matcher_results(group_id=self.group, report_id=report_id) return [MatcherResult(**result) for result in results] def start_forecast_from_raw_data(self, raw_data_source: Union[pd.DataFrame, str], config_fc: ReportConfig, data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier: """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation. Parameters ---------- raw_data_source: typing.Union A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored. config_fc: futureexpert.forecast.ReportConfig The configuration of the forecast run. data_definition: typing.Optional Specifies the data, value and group columns and which rows and columns should be removed. config_ts_creation: typing.Optional Defines filter and aggreagtion level of the time series. config_checkin: typing.Optional Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. Returns ------- The identifier of the forecasting report. """ upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification) user_input_id = upload_feedback['uuid'] file_id = upload_feedback['files'][0]['uuid'] res2 = self.create_time_series(user_input_id=user_input_id, file_uuid=file_id, data_definition=data_definition, config_ts_creation=config_ts_creation, config_checkin=config_checkin, file_specification=file_specification) version = res2['result']['tsVersion']['_id'] return self.start_forecast(version=version, config=config_fc) def start_matcher(self, config: MatcherConfig) -> ReportIdentifier: """Starts a covariate matcher report. Parameters ---------- version ID of a time series version config: futureexpert.matcher.MatcherConfig Configuration of the covariate matcher report. Returns ------- The identifier of the covariate matcher report. """ version_data = self.client.get_ts_version(self.group, config.actuals_version) config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len, granularity=version_data['customer_specific']['granularity']) if not self.is_analyst and config.db_name is not None: raise ValueError('Only users with the role analyst are allowed to use the parameter db_name.') payload = self._create_matcher_payload(config) result = self.client.execute_action(group_id=self.group, core_id='cov-selection', payload=payload, interval_status_check_in_seconds=2) logger.info('Finished report creation.') return ReportIdentifier.model_validate(result) def _create_matcher_payload(self, config: MatcherConfig) -> Any: """Converts the MatcherConfig into the payload needed for the cov-selection core.""" all_covs_versions = config.covs_versions if config.pool_covs is not None: pool_covs_checkin_result = self.check_in_pool_covs(requested_pool_covs=config.pool_covs) all_covs_versions.append(pool_covs_checkin_result.version_id) config_dict: dict[str, Any] = { 'report_description': config.title, 'db_name': config.db_name, 'data_config': { 'actuals_version': config.actuals_version, 'actuals_filter': config.actuals_filter, 'covs_versions': all_covs_versions, 'covs_filter': config.covs_filter, }, "compute_config": { "evaluation_start_date": config.evaluation_start_date, "evaluation_end_date": config.evaluation_end_date, 'max_ts_len': config.max_ts_len, "base_report_id": None, "base_report_requested_run_status": None, "report_update_strategy": "KEEP_OWN_RUNS", "cov_names": { 'cov_name_prefix': '', 'cov_name_field': 'name', 'cov_name_suffix': '', }, "preselection": { "num_obs_short_term_class": 36, "max_publication_lag": config.max_publication_lag, }, "postselection": { "num_obs_short_term_correlation": 60, "clustering_run_id": None, "post_selection_queries": config.post_selection_queries, "post_selection_concatenation_operator": "&", "protected_selections_queries": [], "protected_selections_concatenation_operator": "&" }, "enable_leading_covariate_selection": config.enable_leading_covariate_selection, "fixed_season_length": config.fixed_season_length, "lag_selection": { "fixed_lags": config.lag_selection.fixed_lags, "min_lag": config.lag_selection.min_lag, "max_lag": config.lag_selection.max_lag, } } } return {'payload': config_dict}
FutureEXPERT client.
Initializer.
Parameters
user
:typing.Optional
- The username for the future platform. If not provided, the username is read from environment variable FUTURE_USER.
password
:typing.Optional
- The password for the future platform. If not provided, the password is read from environment variable FUTURE_PW.
group
:typing.Optional
- Optionally the name of the futureEXPERT group. Only relevant if the user has access to multiple groups. If not provided, the group is read from the environment variable FUTURE_GROUP.
environment
:typing.Optional
- Optionally the future environment to be used, defaults to production environment. If not provided, the environment is read from the environment variable FUTURE_ENVIRONMENT.
Static methods
def from_dotenv() ‑> ExpertClient
-
Expand source code
@staticmethod def from_dotenv() -> ExpertClient: """Create an instance from a .env file or environment variables.""" dotenv.load_dotenv() return ExpertClient()
Create an instance from a .env file or environment variables.
Methods
def check_data_definition(self,
user_input_id: str,
file_uuid: str,
data_definition: DataDefinition,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any-
Expand source code
def check_data_definition(self, user_input_id: str, file_uuid: str, data_definition: DataDefinition, file_specification: FileSpecification = FileSpecification()) -> Any: """Checks the data definition. Removes specified rows and columns. Checks if column values have any issues. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: futureexpert.checkin.DataDefinition Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. """ payload = self._create_checkin_payload_1( user_input_id, file_uuid, data_definition, file_specification) logger.info('Started data definition using CHECK-IN...') result = self.client.execute_action(group_id=self.group, core_id='checkin-preprocessing', payload=payload, interval_status_check_in_seconds=2) error_message = result['error'] if error_message != '': raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}') logger.info('Finished data definition.') return result
Checks the data definition.
Removes specified rows and columns. Checks if column values have any issues.
Parameters
user_input_id
:builtins.str
- UUID of the user input.
file_uuid
:builtins.str
- UUID of the file.
data_definition
:DataDefinition
- Specifies the data, value and group columns and which rows and columns are to be removed first.
file_specification
:FileSpecification
- Needed if a CSV is used with e.g. German format.
def check_in_pool_covs(self, requested_pool_covs: list[PoolCovDefinition]) ‑> CheckInPoolResult
-
Expand source code
def check_in_pool_covs(self, requested_pool_covs: list[PoolCovDefinition]) -> CheckInPoolResult: """Create a new version from a list of pool covariates and version ids. Parameters ---------- requested_pool_covs: builtins.list List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id. If no version id is provided, the newest version of the covariate is used. Returns ------- Result object with fields version_id and pool_cov_information. """ logger.info('Transforming input data...') payload = { 'payload': { 'requested_indicators': [ {**covariate.model_dump(exclude_none=True), 'indicator_id': covariate.pool_cov_id} for covariate in requested_pool_covs ] } } for covariate in payload['payload']['requested_indicators']: covariate.pop('pool_cov_id', None) logger.info('Creating time series using checkin-pool...') result = self.client.execute_action(group_id=self.group, core_id='checkin-pool', payload=payload, interval_status_check_in_seconds=2) logger.info('Finished time series creation.') return CheckInPoolResult(**result['result'])
Create a new version from a list of pool covariates and version ids.
Parameters
requested_pool_covs
:builtins.list
- List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id. If no version id is provided, the newest version of the covariate is used.
Returns
Result object with fields version_id and pool_cov_information.
def check_in_time_series(self,
raw_data_source: Union[pd.DataFrame, str],
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> CheckInResult-
Expand source code
def check_in_time_series(self, raw_data_source: Union[pd.DataFrame, str], data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> CheckInResult: """Checks in time series data that can be used as actuals or covariate data. Parameters ---------- raw_data_source: typing.Union Data frame that contains the raw data or path to where the CSV file with the data is stored. data_definition: typing.Optional Specifies the data, value and group columns and which rows and columns are to be removed. config_ts_creation: typing.Optional Defines filter and aggreagtion level of the time series. config_checkin: typing.Optional Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. Returns ------- Id of the time series version. Used to identifiy the time series and the values of the time series. """ upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification) user_input_id = upload_feedback['uuid'] file_id = upload_feedback['files'][0]['uuid'] response = self.create_time_series(user_input_id=user_input_id, file_uuid=file_id, data_definition=data_definition, config_ts_creation=config_ts_creation, config_checkin=config_checkin, file_specification=file_specification) result = [TimeSeries(**ts) for ts in response['result']['timeSeries']] return CheckInResult(time_series=result, version_id=response['result']['tsVersion']['_id'])
Checks in time series data that can be used as actuals or covariate data.
Parameters
raw_data_source
:typing.Union
- Data frame that contains the raw data or path to where the CSV file with the data is stored.
data_definition
:typing.Optional
- Specifies the data, value and group columns and which rows and columns are to be removed.
config_ts_creation
:typing.Optional
- Defines filter and aggreagtion level of the time series.
config_checkin
:typing.Optional
- Path to the JSON file with the CHECK-IN configuration.
config_ts_creation
andconfig_checkin
cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification
:FileSpecification
- Needed if a CSV is used with e.g. German format.
Returns
Id of the time series version. Used to identifiy the time series and the values of the time series.
def create_time_series(self,
user_input_id: str,
file_uuid: str,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any-
Expand source code
def create_time_series(self, user_input_id: str, file_uuid: str, data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> Any: """Last step of the CHECK-IN process which creates the time series. Aggregates the data and saves them to the database. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: typing.Optional Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. config_ts_creation: typing.Optional Configuration for the time series creation. config_checkin: typing.Optional Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the _future_ frontend (now.future-forecasting.de). """ logger.info('Transforming input data...') if config_ts_creation is None and config_checkin is None: raise ValueError('No configuration source is provided.') if config_ts_creation is not None and config_checkin is not None: raise ValueError('Only one configuration source can be processed.') if config_checkin is None and (data_definition is None or config_ts_creation is None): raise ValueError( 'For checkin configuration via python `data_defintion`and `config_ts_cration` must be provided.') if config_ts_creation is not None and data_definition is not None: payload_1 = self._create_checkin_payload_1( user_input_id, file_uuid, data_definition, file_specification) payload = self._create_checkin_payload_2(payload_1, config_ts_creation) if config_checkin is not None: payload = self._build_payload_from_ui_config( user_input_id=user_input_id, file_uuid=file_uuid, path=config_checkin) logger.info('Creating time series using CHECK-IN...') result = self.client.execute_action(group_id=self.group, core_id='checkin-preprocessing', payload=payload, interval_status_check_in_seconds=2) error_message = result['error'] if error_message != '': raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}') logger.info('Finished time series creation.') return result
Last step of the CHECK-IN process which creates the time series.
Aggregates the data and saves them to the database.
Parameters
user_input_id
:builtins.str
- UUID of the user input.
file_uuid
:builtins.str
- UUID of the file.
data_definition
:typing.Optional
- Specifies the data, value and group columns and which rows and columns are to be removed first.
file_specification
:FileSpecification
- Needed if a CSV is used with e.g. German format.
config_ts_creation
:typing.Optional
- Configuration for the time series creation.
config_checkin
:typing.Optional
- Path to the JSON file with the CHECK-IN configuration.
config_ts_creation
andconfig_checkin
cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de).
def get_fc_results(self,
id: Union[ReportIdentifier, int],
include_k_best_models: int = 1,
include_backtesting: bool = False) ‑> list[ForecastResult]-
Expand source code
def get_fc_results(self, id: Union[ReportIdentifier, int], include_k_best_models: int = 1, include_backtesting: bool = False) -> list[ForecastResult]: """Gets the results from the given report. Parameters ---------- id: typing.Union Forecast identifier or plain report ID. include_k_best_models: builtins.int Number of k best models for which results are to be returned. include_backtesting: builtins.bool Determines whether backtesting results are to be returned. """ if include_k_best_models < 1: raise ValueError('At least one model is needed.') if self.get_report_type(report_identifier=id) != 'MongoForecastingResultSink': raise ValueError('The given report ID does not belong to a FORECAST result. ' + 'Please input a different ID or use get_matcher_results().') report_id = id.report_id if isinstance(id, ReportIdentifier) else id results = self.client.get_fc_results(group_id=self.group, report_id=report_id, include_k_best_models=include_k_best_models, include_backtesting=include_backtesting) return [ForecastResult(**result) for result in results]
Gets the results from the given report.
Parameters
id
:typing.Union
- Forecast identifier or plain report ID.
include_k_best_models
:builtins.int
- Number of k best models for which results are to be returned.
include_backtesting
:builtins.bool
- Determines whether backtesting results are to be returned.
def get_matcher_results(self,
id: Union[ReportIdentifier, int]) ‑> list[MatcherResult]-
Expand source code
def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> list[MatcherResult]: """Gets the results from the given report. Parameters ---------- id: typing.Union Report identifier or plain report ID. """ if self.get_report_type(report_identifier=id) != 'CovariateSelection': raise ValueError('The given report ID does not belong to a MATCHER result. ' + 'Please input a different ID or use get_fc_results().') report_id = id.report_id if isinstance(id, ReportIdentifier) else id results = self.client.get_matcher_results(group_id=self.group, report_id=report_id) return [MatcherResult(**result) for result in results]
Gets the results from the given report.
Parameters
id
:typing.Union
- Report identifier or plain report ID.
def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) ‑> PoolCovOverview
-
Expand source code
def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) -> PoolCovOverview: """Gets an overview of all covariates available on POOL according to the given filters. Parameters ---------- granularity: typing.Optional If set, returns only data matching that granularity (Day or Month). search: typing.Optional If set, performs a full-text search and only returns data found in that search. Returns ------- PoolCovOverview object with tables containing the covariates with different levels of detail . """ response_json = self.client.get_pool_cov_overview(granularity=granularity, search=search) return PoolCovOverview(response_json)
Gets an overview of all covariates available on POOL according to the given filters.
Parameters
granularity
:typing.Optional
- If set, returns only data matching that granularity (Day or Month).
search
:typing.Optional
- If set, performs a full-text search and only returns data found in that search.
Returns
PoolCovOverview object with tables containing the covariates with
different levels of detail .
def get_report_status(self,
id: Union[ReportIdentifier, int],
include_error_reason: bool = True) ‑> ReportStatus-
Expand source code
def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus: """Gets the current status of a forecast or matcher report. Parameters ---------- id: typing.Union Report identifier or plain report ID. include_error_reason: builtins.bool Determines whether log messages are to be included in the result. """ fc_identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None) raw_result = self.client.get_report_status( group_id=self.group, report_id=fc_identifier.report_id, include_error_reason=include_error_reason) report_status = raw_result['status_summary'] created = report_status.get('Created', 0) successful = report_status.get('Successful', 0) noeval = report_status.get('NoEvaluation', 0) error = report_status.get('Error', 0) summary = ReportStatusProgress(requested=created, pending=created - successful - noeval - error, finished=successful + noeval + error) results = ReportStatusResults(successful=successful, no_evaluation=noeval, error=error) return ReportStatus(id=fc_identifier, progress=summary, results=results, error_reasons=raw_result.get('customer_specific', {}).get('log_messages', None))
Gets the current status of a forecast or matcher report.
Parameters
id
:typing.Union
- Report identifier or plain report ID.
include_error_reason
:builtins.bool
- Determines whether log messages are to be included in the result.
def get_report_type(self,
report_identifier: int | ReportIdentifier) ‑> str-
Expand source code
def get_report_type(self, report_identifier: int | ReportIdentifier) -> str: """Gets the available reports, ordered from newest to oldest. Parameters ---------- skip The number of initial elements of the report list to skip limit The limit on the length of the report list Returns ------- String representation of the type of one report. """ report_id = report_identifier.report_id if isinstance( report_identifier, ReportIdentifier) else report_identifier return self.client.get_report_type(group_id=self.group, report_id=report_id)
Gets the available reports, ordered from newest to oldest.
Parameters
skip
- The number of initial elements of the report list to skip
limit
- The limit on the length of the report list
Returns
String representation of the type of one report.
def get_reports(self, skip: int = 0, limit: int = 100) ‑> list[ReportSummary]
-
Expand source code
def get_reports(self, skip: int = 0, limit: int = 100) -> list[ReportSummary]: """Gets the available reports, ordered from newest to oldest. Parameters ---------- skip: builtins.int The number of initial elements of the report list to skip: builtins.int limit: builtins.int The limit on the length of the report list Returns ------- The available reports from newest to oldest. """ group_reports = self.client.get_group_reports(group_id=self.group, skip=skip, limit=limit) return [ReportSummary.model_validate(report) for report in group_reports]
Gets the available reports, ordered from newest to oldest.
Parameters
skip
:builtins.int
- The number of initial elements of the report list to skip: builtins.int
limit
:builtins.int
- The limit on the length of the report list
Returns
The available reports from newest to oldest.
def start_forecast(self, version: str, config: ReportConfig) ‑> ReportIdentifier
-
Expand source code
def start_forecast(self, version: str, config: ReportConfig) -> ReportIdentifier: """Starts a forecasting report. Parameters ---------- version: builtins.str ID of a time series version. config: futureexpert.forecast.ReportConfig Configuration of the forecasting report. Returns ------- The identifier of the forecasting report. """ version_data = self.client.get_ts_version(self.group, version) config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len, granularity=version_data['customer_specific']['granularity']) if config.method_selection: config.method_selection.forecasting_methods = remove_arima_if_not_allowed( granularity=version_data['customer_specific']['granularity'], methods=config.method_selection.forecasting_methods) if version_data['customer_specific']['granularity'] in ['weekly', 'daily', 'hourly', 'halfhourly'] \ and 'ARIMA' == config.method_selection.additional_cov_method: raise ValueError('ARIMA is not supported for granularities below monthly.') logger.info('Preparing data for forecast...') if not self.is_analyst and (config.db_name is not None or config.priority is not None): raise ValueError('Only users with the role analyst are allowed to use the parameters db_name and priority.') payload = self._create_forecast_payload(version, config) logger.info('Finished data preparation for forecast.') logger.info('Started creating forecasting report with FORECAST...') result = self.client.execute_action(group_id=self.group, core_id=self.forecast_core_id, payload=payload, interval_status_check_in_seconds=2) logger.info('Finished report creation. Forecasts are running...') return ReportIdentifier.model_validate(result)
Starts a forecasting report.
Parameters
version
:builtins.str
- ID of a time series version.
config
:ReportConfig
- Configuration of the forecasting report.
Returns
The identifier of the forecasting report.
def start_forecast_from_raw_data(self,
raw_data_source: Union[pd.DataFrame, str],
config_fc: ReportConfig,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> ReportIdentifier-
Expand source code
def start_forecast_from_raw_data(self, raw_data_source: Union[pd.DataFrame, str], config_fc: ReportConfig, data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier: """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation. Parameters ---------- raw_data_source: typing.Union A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored. config_fc: futureexpert.forecast.ReportConfig The configuration of the forecast run. data_definition: typing.Optional Specifies the data, value and group columns and which rows and columns should be removed. config_ts_creation: typing.Optional Defines filter and aggreagtion level of the time series. config_checkin: typing.Optional Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. Returns ------- The identifier of the forecasting report. """ upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification) user_input_id = upload_feedback['uuid'] file_id = upload_feedback['files'][0]['uuid'] res2 = self.create_time_series(user_input_id=user_input_id, file_uuid=file_id, data_definition=data_definition, config_ts_creation=config_ts_creation, config_checkin=config_checkin, file_specification=file_specification) version = res2['result']['tsVersion']['_id'] return self.start_forecast(version=version, config=config_fc)
Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.
Parameters
raw_data_source
:typing.Union
- A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
config_fc
:ReportConfig
- The configuration of the forecast run.
data_definition
:typing.Optional
- Specifies the data, value and group columns and which rows and columns should be removed.
config_ts_creation
:typing.Optional
- Defines filter and aggreagtion level of the time series.
config_checkin
:typing.Optional
- Path to the JSON file with the CHECK-IN configuration.
config_ts_creation
andconfig_checkin
cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification
:FileSpecification
- Needed if a CSV is used with e.g. German format.
Returns
The identifier of the forecasting report.
def start_matcher(self, config: MatcherConfig) ‑> ReportIdentifier
-
Expand source code
def start_matcher(self, config: MatcherConfig) -> ReportIdentifier: """Starts a covariate matcher report. Parameters ---------- version ID of a time series version config: futureexpert.matcher.MatcherConfig Configuration of the covariate matcher report. Returns ------- The identifier of the covariate matcher report. """ version_data = self.client.get_ts_version(self.group, config.actuals_version) config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len, granularity=version_data['customer_specific']['granularity']) if not self.is_analyst and config.db_name is not None: raise ValueError('Only users with the role analyst are allowed to use the parameter db_name.') payload = self._create_matcher_payload(config) result = self.client.execute_action(group_id=self.group, core_id='cov-selection', payload=payload, interval_status_check_in_seconds=2) logger.info('Finished report creation.') return ReportIdentifier.model_validate(result)
Starts a covariate matcher report.
Parameters
version
- ID of a time series version
config
:MatcherConfig
- Configuration of the covariate matcher report.
Returns
The identifier of the covariate matcher report.
def switch_group(self, new_group: str, verbose: bool = True) ‑> None
-
Expand source code
def switch_group(self, new_group: str, verbose: bool = True) -> None: """Switches the current group. Parameters ---------- new_group: builtins.str The name of the group to activate. verbose: builtins.bool If enabled, shows the group name in the log message. """ if new_group not in self.client.userinfo['groups']: raise RuntimeError(f'You are not authorized to access group {new_group}') self.group = new_group verbose_text = f' for group {self.group}' if verbose else '' logger.info(f'Successfully logged in{verbose_text}.')
Switches the current group.
Parameters
new_group
:builtins.str
- The name of the group to activate.
verbose
:builtins.bool
- If enabled, shows the group name in the log message.
def upload_data(self,
source: Union[pd.DataFrame, str],
file_specification: Optional[FileSpecification] = None) ‑> Any-
Expand source code
def upload_data(self, source: Union[pd.DataFrame, str], file_specification: Optional[FileSpecification] = None) -> Any: """Uploads the given raw data for further processing. Parameters ---------- source: typing.Union Path to a CSV file or a pandas data frame. file_specification: typing.Optional If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones. The parameter has no effect if source is a path to a CSV file. Returns ------- Identifier for the user Inputs. """ df_file = None if isinstance(source, pd.DataFrame): if not file_specification: file_specification = FileSpecification() csv = source.to_csv(index=False, sep=file_specification.delimiter, decimal=file_specification.decimal, encoding='utf-8-sig') time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S') df_file = (f'expert-{time_stamp}.csv', csv) path = None else: path = source # TODO: currently only one file is supported here. upload_feedback = self.client.upload_user_inputs_for_group( self.group, path, df_file) return upload_feedback
Uploads the given raw data for further processing.
Parameters
source
:typing.Union
- Path to a CSV file or a pandas data frame.
file_specification
:typing.Optional
- If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones. The parameter has no effect if source is a path to a CSV file.
Returns
Identifier for the user Inputs.
class MissingCredentialsError (missing_credential_type: str)
-
Expand source code
class MissingCredentialsError(RuntimeError): def __init__(self, missing_credential_type: str) -> None: super().__init__(f'Please enter {missing_credential_type} either when ' + 'initializing the expert client or in the the .env file!')
Unspecified run-time error.
Ancestors
- builtins.RuntimeError
- builtins.Exception
- builtins.BaseException
class ReportIdentifier (**data: Any)
-
Expand source code
class ReportIdentifier(pydantic.BaseModel): report_id: int settings_id: Optional[int]
Usage docs: https://docs.pydantic.dev/2.10/concepts/models/
A base class for creating Pydantic models.
Attributes
__class_vars__
- The names of the class variables defined on the model.
__private_attributes__
- Metadata about the private attributes of the model.
__signature__
- The synthesized
__init__
[Signature
][inspect.Signature] of the model. __pydantic_complete__
- Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
- The core schema of the model.
__pydantic_custom_init__
- Whether the model has a custom
__init__
function. __pydantic_decorators__
- Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. __pydantic_generic_metadata__
- Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
- Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
- The name of the post-init method for the model, if defined.
__pydantic_root_model__
- Whether the model is a [
RootModel
][pydantic.root_model.RootModel]. __pydantic_serializer__
- The
pydantic-core
SchemaSerializer
used to dump instances of the model. __pydantic_validator__
- The
pydantic-core
SchemaValidator
used to validate instances of the model. __pydantic_fields__
- A dictionary of field names and their corresponding [
FieldInfo
][pydantic.fields.FieldInfo] objects. __pydantic_computed_fields__
- A dictionary of computed field names and their corresponding [
ComputedFieldInfo
][pydantic.fields.ComputedFieldInfo] objects. __pydantic_extra__
- A dictionary containing extra values, if [
extra
][pydantic.config.ConfigDict.extra] is set to'allow'
. __pydantic_fields_set__
- The names of fields explicitly set during instantiation.
__pydantic_private__
- Values of private attributes set on the model instance.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var model_config
var report_id : int
var settings_id : int | None
class ReportStatus (**data: Any)
-
Expand source code
class ReportStatus(pydantic.BaseModel): """Status of a forecast or matcher report.""" id: ReportIdentifier progress: ReportStatusProgress results: ReportStatusResults error_reasons: Optional[Any] = None @property def is_finished(self) -> bool: """Indicates whether a forecasting report is finished.""" return self.progress.pending == 0 def print(self) -> None: """Prints a summary of the status.""" title = f'Status forecasting report for id: {self.id}' if self.progress.requested == 0: print(f'{title}\n No run was created') return pct_txt = f'{round(self.progress.finished/self.progress.requested*100)} % are finished' overall = f'{self.progress.requested} time series requested for calculation' finished_txt = f'{self.progress.finished} time series finished' noeval_txt = f'{self.results.no_evaluation} time series without evaluation' error_txt = f'{self.results.error} time series ran into an error' print(f'{title}\n {pct_txt} \n {overall} \n {finished_txt} \n {noeval_txt} \n {error_txt}')
Status of a forecast or matcher report.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var error_reasons : Any | None
var id : ReportIdentifier
var model_config
var progress : ReportStatusProgress
var results : ReportStatusResults
Instance variables
prop is_finished : bool
-
Expand source code
@property def is_finished(self) -> bool: """Indicates whether a forecasting report is finished.""" return self.progress.pending == 0
Indicates whether a forecasting report is finished.
Methods
def print(self) ‑> None
-
Expand source code
def print(self) -> None: """Prints a summary of the status.""" title = f'Status forecasting report for id: {self.id}' if self.progress.requested == 0: print(f'{title}\n No run was created') return pct_txt = f'{round(self.progress.finished/self.progress.requested*100)} % are finished' overall = f'{self.progress.requested} time series requested for calculation' finished_txt = f'{self.progress.finished} time series finished' noeval_txt = f'{self.results.no_evaluation} time series without evaluation' error_txt = f'{self.results.error} time series ran into an error' print(f'{title}\n {pct_txt} \n {overall} \n {finished_txt} \n {noeval_txt} \n {error_txt}')
Prints a summary of the status.
class ReportStatusProgress (**data: Any)
-
Expand source code
class ReportStatusProgress(pydantic.BaseModel): """Progress of a forecasting report.""" requested: int pending: int finished: int
Progress of a forecasting report.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var finished : int
var model_config
var pending : int
var requested : int
class ReportStatusResults (**data: Any)
-
Expand source code
class ReportStatusResults(pydantic.BaseModel): """Result status of a forecasting report. This only includes runs that are already finished.""" successful: int no_evaluation: int error: int
Result status of a forecasting report.
This only includes runs that are already finished.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var error : int
var model_config
var no_evaluation : int
var successful : int
class ReportSummary (**data: Any)
-
Expand source code
class ReportSummary(pydantic.BaseModel): report_id: int description: str
Usage docs: https://docs.pydantic.dev/2.10/concepts/models/
A base class for creating Pydantic models.
Attributes
__class_vars__
- The names of the class variables defined on the model.
__private_attributes__
- Metadata about the private attributes of the model.
__signature__
- The synthesized
__init__
[Signature
][inspect.Signature] of the model. __pydantic_complete__
- Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
- The core schema of the model.
__pydantic_custom_init__
- Whether the model has a custom
__init__
function. __pydantic_decorators__
- Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. __pydantic_generic_metadata__
- Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
- Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
- The name of the post-init method for the model, if defined.
__pydantic_root_model__
- Whether the model is a [
RootModel
][pydantic.root_model.RootModel]. __pydantic_serializer__
- The
pydantic-core
SchemaSerializer
used to dump instances of the model. __pydantic_validator__
- The
pydantic-core
SchemaValidator
used to validate instances of the model. __pydantic_fields__
- A dictionary of field names and their corresponding [
FieldInfo
][pydantic.fields.FieldInfo] objects. __pydantic_computed_fields__
- A dictionary of computed field names and their corresponding [
ComputedFieldInfo
][pydantic.fields.ComputedFieldInfo] objects. __pydantic_extra__
- A dictionary containing extra values, if [
extra
][pydantic.config.ConfigDict.extra] is set to'allow'
. __pydantic_fields_set__
- The names of fields explicitly set during instantiation.
__pydantic_private__
- Values of private attributes set on the model instance.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var description : str
var model_config
var report_id : int