Package futureexpert
Sub-modules
futureexpert.associator-
Contains the models with the configuration for the associator and the result format.
futureexpert.checkin-
Contains the models with the configuration for CHECK-IN.
futureexpert.expert_client-
Client for connecting with future.
futureexpert.forecast-
Contains the models with the configuration for the forecast and the result format.
futureexpert.forecast_consistency-
Contains the models with the configuration for the hierarchical reconciliation and the result format.
futureexpert.matcher-
Contains the models with the configuration for the matcher and the result format.
futureexpert.plot-
Contains all the functionality to plot the checked in time series and the forecast and backtesting results.
futureexpert.poolfutureexpert.shared_models-
Shared models used across multiple modules.
Classes
class ActualsCovsConfiguration (**data: Any)-
Expand source code
class ActualsCovsConfiguration(BaseModel): """Configuration of actuals and covariates via name and lag. Parameters ---------- actuals_name: builtins.str Name of the time series. covs_configurations: builtins.list[futureexpert.shared_models.CovariateRef] List of Covariates. """ actuals_name: str covs_configurations: list[CovariateRef]Configuration of actuals and covariates via name and lag.
Parameters
actuals_name:builtins.str- Name of the time series.
covs_configurations:builtins.list[CovariateRef]- List of Covariates.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var actuals_name : strvar covs_configurations : list[CovariateRef]var model_config
class AssociatorConfig (**data: Any)-
Expand source code
class AssociatorConfig(BaseConfig): """Service configuration. Parameters ---------- data_selection: futureexpert.associator.DataSelection Configuration on the selection of time series used for carrying out the service. trend_detection: futureexpert.associator.TrendDetectionConfiguration Configuration for trend detection. clustering: futureexpert.associator.ClusteringConfiguration Configuration for clustering. report_note: builtins.str User-defined string to be included in the report. db_name: typing.Optional[builtins.str] Only accessible for internal use. Name of the database to use for storing the results. """ data_selection: DataSelection = Field(default_factory=DataSelection) trend_detection: TrendDetectionConfiguration = Field(default_factory=TrendDetectionConfiguration) clustering: ClusteringConfiguration = Field(default_factory=ClusteringConfiguration) report_note: str db_name: Optional[str] = NoneService configuration.
Parameters
data_selection:DataSelection- Configuration on the selection of time series used for carrying out the service.
trend_detection:TrendDetectionConfiguration- Configuration for trend detection.
clustering:ClusteringConfiguration- Configuration for clustering.
report_note:builtins.str- User-defined string to be included in the report.
db_name:typing.Optional[builtins.str]- Only accessible for internal use. Name of the database to use for storing the results.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var clustering : ClusteringConfigurationvar data_selection : DataSelectionvar db_name : str | Nonevar model_configvar report_note : strvar trend_detection : TrendDetectionConfiguration
class ClusteringConfiguration (**data: Any)-
Expand source code
class ClusteringConfiguration(BaseConfig): """Configuration for clustering. If start_time or end_time is not provided, then the missing(s) of the two will be determined automatically; the final four parameters govern this process. Parameters ---------- create_clusters: builtins.bool If True, then the service will attempt clustering. n_clusters: builtins.int Number of clusters of complete and non-constant time series. start_time: typing.Optional[datetime.datetime] Observations from start_time (inclusive) onwards will be considered during clustering. end_time: typing.Optional[datetime.datetime] Observations up to end_time (inclusive) will be considered during clustering. """ create_clusters: bool = True n_clusters: int = Field(default=None, gt=0) start_time: Optional[datetime] = None end_time: Optional[datetime] = None @model_validator(mode='after') def validate_times(self) -> 'ClusteringConfiguration': if self.start_time is not None and self.end_time is not None and self.start_time > self.end_time: raise ValueError('End time precedes start time.') return selfConfiguration for clustering.
If start_time or end_time is not provided, then the missing(s) of the two will be determined automatically; the final four parameters govern this process.
Parameters
create_clusters:builtins.bool- If True, then the service will attempt clustering.
n_clusters:builtins.int- Number of clusters of complete and non-constant time series.
start_time:typing.Optional[datetime.datetime]- Observations from start_time (inclusive) onwards will be considered during clustering.
end_time:typing.Optional[datetime.datetime]- Observations up to end_time (inclusive) will be considered during clustering.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var create_clusters : boolvar end_time : datetime.datetime | Nonevar model_configvar n_clusters : intvar start_time : datetime.datetime | None
Methods
def validate_times(self) ‑> ClusteringConfiguration-
Expand source code
@model_validator(mode='after') def validate_times(self) -> 'ClusteringConfiguration': if self.start_time is not None and self.end_time is not None and self.start_time > self.end_time: raise ValueError('End time precedes start time.') return self
class CovariateRef (**data: Any)-
Expand source code
class CovariateRef(BaseModel): """Covariate reference. Parameters ---------- name: builtins.str Name of the Covariate lag: builtins.int Lag by which the covariate was used. """ name: str lag: intCovariate reference.
Parameters
name:builtins.str- Name of the Covariate
lag:builtins.int- Lag by which the covariate was used.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var lag : intvar model_configvar name : str
class DataDefinition (**data: Any)-
Expand source code
class DataDefinition(BaseConfig): """Model for the input parameter needed for the first CHECK-IN step. Every single column in your data must be accounted for. Each column must either be assigned a type (`date_column`, `value_columns`, `group_columns`) or be explicitly marked for removal in `remove_columns`. Parameters ---------- date_column: futureexpert.checkin.DateColumn Definition of the date column. Must be a single column that contains the complete date information. value_columns: builtins.list[futureexpert.checkin.ValueColumn] Definitions of the value columns. Not all columns defined here must be used for time series creation; selecting a subset or combining is possible in a later step. group_columns: builtins.list[futureexpert.checkin.GroupColumn] Definitions of the group columns. Not all columns defined here must be used for time series creation; selecting a subset is possible in a later step. Grouping information can also be used to create hierarchical levels. remove_rows: typing.Optional[builtins.list[builtins.int]] Indexes of the rows to be removed before validation. Note: If the raw data was committed as pandas data frame the header is the first row (row index 0). remove_columns: typing.Optional[builtins.list[builtins.int]] Indexes of the columns to be removed before validation. Any column that is not assigned a type must be listed here. """ date_column: DateColumn value_columns: list[ValueColumn] group_columns: list[GroupColumn] = [] remove_rows: Optional[list[int]] = [] remove_columns: Optional[list[int]] = []Model for the input parameter needed for the first CHECK-IN step. Every single column in your data must be accounted for. Each column must either be assigned a type (
date_column,value_columns,group_columns) or be explicitly marked for removal inremove_columns.Parameters
date_column:DateColumn- Definition of the date column. Must be a single column that contains the complete date information.
value_columns:builtins.list[ValueColumn]- Definitions of the value columns. Not all columns defined here must be used for time series creation; selecting a subset or combining is possible in a later step.
group_columns:builtins.list[GroupColumn]- Definitions of the group columns. Not all columns defined here must be used for time series creation; selecting a subset is possible in a later step. Grouping information can also be used to create hierarchical levels.
remove_rows:typing.Optional[builtins.list[builtins.int]]- Indexes of the rows to be removed before validation. Note: If the raw data was committed as pandas data frame the header is the first row (row index 0).
remove_columns:typing.Optional[builtins.list[builtins.int]]- Indexes of the columns to be removed before validation. Any column that is not assigned a type must be listed here.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var date_column : DateColumnvar group_columns : list[GroupColumn]var model_configvar remove_columns : list[int] | Nonevar remove_rows : list[int] | Nonevar value_columns : list[ValueColumn]
class DataSelection (**data: Any)-
Expand source code
class DataSelection(BaseConfig): """Time series selection. Parameters ---------- version: typing.Optional[builtins.str] Time series version to be used. If None, then the latest version is used. filter: builtins.dict[builtins.str, typing.Any] Filter to select a subset of time series based on their metadata. """ version: Optional[str] = None filter: dict[str, Any] = Field(default_factory=dict)Time series selection.
Parameters
version:typing.Optional[builtins.str]- Time series version to be used. If None, then the latest version is used.
filter:builtins.dict[builtins.str, typing.Any]- Filter to select a subset of time series based on their metadata.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var filter : dict[str, typing.Any]var model_configvar version : str | None
class ExpertClient (user: Optional[str] = None,
password: Optional[str] = None,
totp: Optional[str] = None,
refresh_token: Optional[str] = None,
group: Optional[str] = None,
environment: "Optional[Literal['production', 'staging', 'development']]" = None)-
Expand source code
class ExpertClient: """FutureEXPERT client.""" def __init__(self, user: Optional[str] = None, password: Optional[str] = None, totp: Optional[str] = None, refresh_token: Optional[str] = None, group: Optional[str] = None, environment: Optional[Literal['production', 'staging', 'development']] = None) -> None: """Initializer. Login using either your user credentials or a valid refresh token. Parameters ---------- user The username for the _future_ platform. If not provided, the username is read from environment variable FUTURE_USER. password The password for the _future_ platform. If not provided, the password is read from environment variable FUTURE_PW. totp Optional second factor for authentication using user credentials. refresh_token Alternative login using a refresh token only instead of user credentials. You can retrieve a long-lived refresh token (offline token) from our identity provider using Open ID Connect scope `offline_access` at the token endpoint. Example: curl -s -X POST 'https://future-auth.prognostica.de/realms/future/protocol/openid-connect/token' \ -H 'Content-Type: application/x-www-form-urlencoded' \ --data-urlencode 'client_id=expert' \ --data-urlencode 'grant_type=password' \ --data-urlencode 'scope=openid offline_access' \ --data-urlencode "username=$FUTURE_USER" \ --data-urlencode "password=$FUTURE_PW" | jq .refresh_token group Optionally the name of the futureEXPERT group. Only relevant if the user has access to multiple groups. If not provided, the group is read from the environment variable FUTURE_GROUP. environment Optionally the _future_ environment to be used, defaults to production environment. If not provided, the environment is read from the environment variable FUTURE_ENVIRONMENT. """ future_env = cast(Literal['production', 'staging', 'development'], environment or os.getenv('FUTURE_ENVIRONMENT') or 'production') if refresh_token: self.api_client = FutureApiClient(refresh_token=refresh_token, environment=future_env) else: try: future_user = user or os.environ['FUTURE_USER'] except KeyError: raise MissingCredentialsError('username') from None try: future_password = password or os.environ['FUTURE_PW'] except KeyError: raise MissingCredentialsError('password') from None self.api_client = FutureApiClient(user=future_user, password=future_password, environment=future_env, totp=totp) authorized_groups = self.api_client.userinfo['groups'] future_group = group or os.getenv('FUTURE_GROUP') if future_group is None and len(authorized_groups) != 1: raise ValueError( f'You have access to multiple groups. Please select one of the following: {authorized_groups}') self.switch_group(new_group=future_group or authorized_groups[0], verbose=future_group is not None) self.is_analyst = 'analyst' in self.api_client.user_roles self.forecast_core_id = 'forecast-batch-internal' if self.is_analyst else 'forecast-batch' self.matcher_core_id = 'cov-selection-internal' if self.is_analyst else 'cov-selection' self.associator_core_id = 'associator' self.hcfc_core_id = 'hcfc' @staticmethod def from_dotenv() -> ExpertClient: """Create an instance from a .env file or environment variables. Parameters ---------- return: futureexpert.expert_client.ExpertClient """ dotenv.load_dotenv() return ExpertClient() def switch_group(self, new_group: str, verbose: bool = True) -> None: """Switches the current group. Parameters ---------- new_group: builtins.str The name of the group to activate. verbose: builtins.bool If enabled, shows the group name in the log message. return: builtins.NoneType """ if new_group not in self.api_client.userinfo['groups']: raise RuntimeError(f'You are not authorized to access group {new_group}') self.group = new_group verbose_text = f' for group {self.group}' if verbose else '' logger.info(f'Successfully logged in{verbose_text}.') def upload_data(self, source: Union[pd.DataFrame, str], file_specification: Optional[FileSpecification] = None) -> Any: """Uploads the given raw data for further processing. Parameters ---------- source: typing.Union[pandas.core.frame.DataFrame, builtins.str] Path to a CSV file or a pandas data frame. file_specification: typing.Optional[futureexpert.checkin.FileSpecification] If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones. The parameter has no effect if source is a path to a CSV file. Returns ------- Identifier for the user Inputs. return: typing.Any """ df_file = None if isinstance(source, pd.DataFrame): if not file_specification: file_specification = FileSpecification() csv = source.to_csv(index=False, sep=file_specification.delimiter, decimal=file_specification.decimal, encoding='utf-8-sig') time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S') df_file = (f'expert-{time_stamp}.csv', csv) path = None else: path = source # TODO: currently only one file is supported here. upload_feedback = self.api_client.upload_user_inputs_for_group(self.group, path, df_file) return upload_feedback def check_data_definition(self, user_input_id: str, file_uuid: str, data_definition: DataDefinition, file_specification: FileSpecification = FileSpecification()) -> Any: """Checks the data definition. Removes specified rows and columns. Checks if column values have any issues. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: futureexpert.checkin.DataDefinition Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. return: typing.Any """ payload = self._create_checkin_payload_1( user_input_id, file_uuid, data_definition, file_specification) logger.info('Started data definition using CHECK-IN...') result = self.api_client.execute_action(group_id=self.group, core_id='checkin-preprocessing', payload=payload, interval_status_check_in_seconds=2) error_message = result['error'] if error_message != '': raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}') logger.info('Finished data definition.') return result def create_time_series(self, user_input_id: str, file_uuid: str, data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> Any: """Last step of the CHECK-IN process which creates the time series. Aggregates the data and saves them to the database. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: typing.Optional[futureexpert.checkin.DataDefinition] Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig] Configuration for the time series creation. config_checkin: typing.Optional[builtins.str] Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the _future_ frontend (now.future-forecasting.de). return: typing.Any """ logger.info('Transforming input data...') if config_ts_creation is None and config_checkin is None: raise ValueError('No configuration source is provided.') if config_ts_creation is not None and config_checkin is not None: raise ValueError('Only one configuration source can be processed.') if config_checkin is None and (data_definition is None or config_ts_creation is None): raise ValueError( 'For checkin configuration via python `data_defintion`and `config_ts_cration` must be provided.') if config_ts_creation is not None and data_definition is not None: payload_1 = self._create_checkin_payload_1( user_input_id, file_uuid, data_definition, file_specification) payload = self._create_checkin_payload_2(payload_1, config_ts_creation) if config_checkin is not None: payload = self._build_payload_from_ui_config( user_input_id=user_input_id, file_uuid=file_uuid, path=config_checkin) logger.info('Creating time series using CHECK-IN...') result = self.api_client.execute_action(group_id=self.group, core_id='checkin-preprocessing', payload=payload, interval_status_check_in_seconds=2) error_message = result['error'] if error_message != '': raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}') logger.info('Finished time series creation.') return result def check_in_pool_covs(self, requested_pool_covs: list[PoolCovDefinition], description: Optional[str] = None) -> CheckInPoolResult: """Create a new version from a list of pool covariates and version ids. Parameters ---------- requested_pool_covs: builtins.list[futureexpert.pool.PoolCovDefinition] List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id. If no version id is provided, the newest version of the covariate is used. description: typing.Optional[builtins.str] A short description of the selected covariates. Returns ------- Result object with fields version_id and pool_cov_information. return: futureexpert.pool.CheckInPoolResult """ logger.info('Transforming input data...') payload: dict[str, Any] = { 'payload': { 'requested_indicators': [ {**covariate.model_dump(exclude_none=True), 'indicator_id': covariate.pool_cov_id} for covariate in requested_pool_covs ] } } for covariate in payload['payload']['requested_indicators']: covariate.pop('pool_cov_id', None) payload['payload']['version_description'] = description logger.info('Creating time series using checkin-pool...') result = self.api_client.execute_action(group_id=self.group, core_id='checkin-pool', payload=payload, interval_status_check_in_seconds=2) logger.info('Finished time series creation.') return CheckInPoolResult(**result['result']) def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) -> PoolCovOverview: """Gets an overview of all covariates available on POOL according to the given filters. Parameters ---------- granularity: typing.Optional[builtins.str] If set, returns only data matching that granularity (Day or Month). search: typing.Optional[builtins.str] If set, performs a full-text search and only returns data found in that search. Returns ------- PoolCovOverview object with tables containing the covariates with different levels of detail . return: futureexpert.pool.PoolCovOverview """ response_json = self.api_client.get_pool_cov_overview(granularity=granularity, search=search) return PoolCovOverview(response_json) def get_time_series(self, version_id: str) -> CheckInResult: """Get time series data. From previously checked-in data. Parameters --------- version_id: builtins.str Id of the time series version. Returns ------- Id of the time series version. Used to identifiy the time series and the values of the time series. return: futureexpert.checkin.CheckInResult """ result = self.api_client.get_ts_data(self.group, version_id) return CheckInResult(time_series=[TimeSeries(**ts) for ts in result], version_id=version_id) def check_in_time_series(self, raw_data_source: Union[pd.DataFrame, str], data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> str: """Checks in time series data that can be used as actuals or covariate data. Parameters ---------- raw_data_source: typing.Union[pandas.core.frame.DataFrame, builtins.str] Data frame that contains the raw data or path to where the CSV file with the data is stored. data_definition: typing.Optional[futureexpert.checkin.DataDefinition] Specifies the data, value and group columns and which rows and columns are to be removed. config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig] Defines filter and aggreagtion level of the time series. config_checkin: typing.Optional[builtins.str] Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. Returns ------- Id of the time series version. Used to identifiy the time series. return: builtins.str """ upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification) user_input_id = upload_feedback['uuid'] file_id = upload_feedback['files'][0]['uuid'] response = self.create_time_series(user_input_id=user_input_id, file_uuid=file_id, data_definition=data_definition, config_ts_creation=config_ts_creation, config_checkin=config_checkin, file_specification=file_specification) return str(response['result']['tsVersion']) def _create_checkin_payload_1(self, user_input_id: str, file_uuid: str, data_definition: DataDefinition, file_specification: FileSpecification = FileSpecification()) -> Any: """Creates the payload for the CHECK-IN stage prepareDataset. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: futureexpert.checkin.DataDefinition Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Specify the format of the CSV file. Only relevant if a CSV was given as input. return: typing.Any """ return {'userInputId': user_input_id, 'payload': { 'stage': 'prepareDataset', 'fileUuid': file_uuid, 'meta': file_specification.model_dump(), 'performedTasks': { 'removedRows': data_definition.remove_rows, 'removedCols': data_definition.remove_columns }, 'columnDefinition': { 'dateColumns': [{snake_to_camel(key): value for key, value in data_definition.date_column.model_dump(exclude_none=True).items()}], 'valueColumns': [{snake_to_camel(key): value for key, value in d.model_dump(exclude_none=True).items()} for d in data_definition.value_columns], 'groupColumns': [{snake_to_camel(key): value for key, value in d.model_dump(exclude_none=True).items()} for d in data_definition.group_columns] } }} def _build_payload_from_ui_config(self, user_input_id: str, file_uuid: str, path: str) -> Any: """Creates the payload for the CHECK-IN stage createDataset. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. path: builtins.str Path to the JSON file. return: typing.Any """ with open(path) as file: file_data = file.read() json_data = json.loads(file_data) json_data['stage'] = 'createDataset' json_data['fileUuid'] = file_uuid del json_data["performedTasksLog"] return {'userInputId': user_input_id, 'payload': json_data} def _create_checkin_payload_2(self, payload: dict[str, Any], config: TsCreationConfig) -> Any: """Creates the payload for the CHECK-IN stage createDataset. Parameters ---------- payload: builtins.dict[builtins.str, typing.Any] Payload used in `create_checkin_payload_1`. config: futureexpert.checkin.TsCreationConfig Configuration for time series creation. return: typing.Any """ payload['payload']['rawDataReviewResults'] = {} payload['payload']['timeSeriesDatasetParameter'] = { 'aggregation': {'operator': 'sum', 'option': config.missing_value_handler}, 'date': { 'timeGranularity': config.time_granularity, 'startDate': config.start_date, 'endDate': config.end_date }, 'grouping': { 'dataLevel': config.grouping_level, 'saveHierarchy': config.save_hierarchy, 'filter': [d.model_dump() for d in config.filter] }, 'values': [{snake_to_camel(key): value for key, value in d.model_dump().items()} for d in config.new_variables], 'valueColumnsToSave': config.value_columns_to_save } payload['payload']['versionDescription'] = config.description payload['payload']['stage'] = 'createDataset' return payload def _create_reconciliation_payload(self, config: MakeForecastConsistentConfiguration) -> Any: """Creates the payload for forecast reconciliation. Parameters ---------- config: futureexpert.forecast_consistency.MakeForecastConsistentConfiguration Configuration of the make forecast consistent run. return: typing.Any """ config_dict = config.model_dump() return {'payload': config_dict} def _create_forecast_payload(self, version: str, config: ReportConfig) -> Any: """Creates the payload for the forecast. Parameters ---------- version: builtins.str Version of the time series that should get forecasts. config: futureexpert.forecast.ReportConfig Configuration of the forecast run. return: typing.Any """ config_dict = config.model_dump() config_dict['actuals_version'] = version config_dict['report_note'] = config_dict['title'] config_dict['cov_selection_report_id'] = config_dict['matcher_report_id'] config_dict['forecasting']['n_ahead'] = config_dict['forecasting']['fc_horizon'] config_dict['backtesting'] = config_dict['method_selection'] if config.rerun_report_id: config_dict['base_report_id'] = config.rerun_report_id config_dict['report_update_strategy'] = 'KEEP_OWN_RUNS' base_report_requested_run_status = ['Successful'] if 'NoEvaluation' not in config.rerun_status: base_report_requested_run_status.append('NoEvaluation') config_dict['base_report_requested_run_status'] = base_report_requested_run_status if config.pool_covs is not None: pool_covs_checkin_result = self.check_in_pool_covs(requested_pool_covs=config.pool_covs) cast(list[str], config_dict['covs_versions']).append(pool_covs_checkin_result.version_id) config_dict.pop('pool_covs') config_dict.pop('title') config_dict['forecasting'].pop('fc_horizon') config_dict.pop('matcher_report_id') config_dict.pop('method_selection') config_dict.pop('rerun_report_id') config_dict.pop('rerun_status') payload = {'payload': config_dict} return payload def start_associator(self, config: AssociatorConfig) -> ReportIdentifier: """Sarts an associator report. Parameters ---------- config: futureexpert.associator.AssociatorConfig Configuration of the associator run. Returns ------- The identifier of the associator report. return: futureexpert.expert_client.ReportIdentifier """ config_dict = config.model_dump() payload = {'payload': config_dict} result = self.api_client.execute_action(group_id=self.group, core_id=self.associator_core_id, payload=payload, interval_status_check_in_seconds=2) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Associator finished') return report def start_forecast(self, version: str, config: ReportConfig) -> ReportIdentifier: """Starts a forecasting report. Parameters ---------- version: builtins.str ID of a time series version. config: futureexpert.forecast.ReportConfig Configuration of the forecasting report. Returns ------- The identifier of the forecasting report. return: futureexpert.expert_client.ReportIdentifier """ version_data = self.api_client.get_ts_version(self.group, version) config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len, granularity=version_data['customer_specific']['granularity']) logger.info('Preparing data for forecast...') if not self.is_analyst and (config.db_name is not None or config.priority is not None): raise ValueError('Only users with the role analyst are allowed to use the parameters db_name and priority.') payload = self._create_forecast_payload(version, config) logger.info('Finished data preparation for forecast.') logger.info('Started creating forecasting report with FORECAST...') result = self.api_client.execute_action(group_id=self.group, core_id=self.forecast_core_id, payload=payload, interval_status_check_in_seconds=2) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Forecasts are running...') return report def start_making_forecast_consistent(self, config: MakeForecastConsistentConfiguration) -> ReportIdentifier: """Starts process of making forecasts hierarchically consistent. Parameters ---------- config: futureexpert.forecast_consistency.MakeForecastConsistentConfiguration Configuration of the make forecast consistent run. Returns ------- The identifier of the forecasting report. return: futureexpert.expert_client.ReportIdentifier """ logger.info('Preparing data for forecast consistency...') if not self.is_analyst and (config.db_name is not None): raise ValueError('Only users with the role analyst are allowed to use the parameters db_name.') payload = self._create_reconciliation_payload(config) logger.info('Finished data preparation for forecast consistency.') logger.info('Started creating hierarchical reconciliation for consistent forecasts...') result = self.api_client.execute_action(group_id=self.group, core_id=self.hcfc_core_id, payload=payload, interval_status_check_in_seconds=2) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Reconciliation is running...') return report def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) -> str: """Gets the available reports, ordered from newest to oldest. Parameters ---------- skip The number of initial elements of the report list to skip limit The limit on the length of the report list Returns ------- String representation of the type of one report. report_identifier: typing.Union[builtins.int, futureexpert.expert_client.ReportIdentifier] return: builtins.str """ report_id = report_identifier.report_id if isinstance( report_identifier, ReportIdentifier) else report_identifier return self.api_client.get_report_type(group_id=self.group, report_id=report_id) def get_reports(self, skip: int = 0, limit: int = 100) -> pd.DataFrame: """Gets the available reports, ordered from newest to oldest. Parameters ---------- skip: builtins.int The number of initial elements of the report list to skip: builtins.int limit: builtins.int The limit on the length of the report list Returns ------- The available reports from newest to oldest. return: pandas.core.frame.DataFrame """ group_reports = self.api_client.get_group_reports(group_id=self.group, skip=skip, limit=limit) vallidated_report_summarys = [ReportSummary.model_validate(report) for report in group_reports] return pd.DataFrame([report_summary.model_dump() for report_summary in vallidated_report_summarys]) def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus: """Gets the current status of a forecast or matcher report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. include_error_reason: builtins.bool Determines whether log messages are to be included in the result. return: futureexpert.expert_client.ReportStatus """ fc_identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None) raw_result = self.api_client.get_report_status(group_id=self.group, report_id=fc_identifier.report_id, include_error_reason=include_error_reason) report_status = raw_result['status_summary'] created = report_status.get('Created', 0) successful = report_status.get('Successful', 0) noeval = report_status.get('NoEvaluation', 0) error = report_status.get('Error', 0) summary = ReportStatusProgress(requested=created, pending=created - successful - noeval - error, finished=successful + noeval + error) results = ReportStatusResults(successful=successful, no_evaluation=noeval, error=error) customer_specific = raw_result.get('customer_specific', None) assert (customer_specific is None or isinstance(customer_specific, dict)), 'unexpected type of customer_specific property' return ReportStatus(id=fc_identifier, progress=summary, results=results, error_reasons=None if customer_specific is None else customer_specific.get('log_messages', None)) def get_fc_results(self, id: Union[ReportIdentifier, int], include_k_best_models: int = 1, include_backtesting: bool = False, include_discarded_models: bool = False) -> list[ForecastResult]: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Forecast identifier or plain report ID. include_k_best_models: builtins.int Number of k best models for which results are to be returned. include_backtesting: builtins.bool Determines whether backtesting results are to be returned. include_discarded_models: builtins.bool Determines if models excluded from ranking should be included in the result. return: builtins.list[futureexpert.forecast.ForecastResult] """ if include_k_best_models < 1: raise ValueError('At least one model is needed.') if self.get_report_type(report_identifier=id) not in ['forecast', 'MongoForecastingResultSink']: raise ValueError('The given report ID does not belong to a FORECAST result. ' + 'Please input a different ID or use get_matcher_results().') report_id = id.report_id if isinstance(id, ReportIdentifier) else id results = self.api_client.get_fc_results(group_id=self.group, report_id=report_id, include_k_best_models=include_k_best_models, include_backtesting=include_backtesting, include_discarded_models=include_discarded_models) return [ForecastResult(**result) for result in results] def get_consistent_forecast_results(self, id: Union[ReportIdentifier, int] ) -> ConsistentForecastResult: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. return: futureexpert.forecast_consistency.ConsistentForecastResult """ if self.get_report_type(report_identifier=id) != 'hierarchical-forecast': raise ValueError('The given report ID does not belong to a reconciled forecast result. ' + 'Please input a different ID.') report_id = id.report_id if isinstance(id, ReportIdentifier) else id results = self.api_client.get_hierarchical_fc_results(group_id=self.group, report_id=report_id) return ConsistentForecastResult(**results) def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> list[MatcherResult]: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. return: futureexpert.forecast_consistency.ConsistentForecastResult """ if self.get_report_type(report_identifier=id) not in ['matcher', 'CovariateSelection']: raise ValueError('The given report ID does not belong to a MATCHER result. ' + 'Please input a different ID or use get_fc_results().') report_id = id.report_id if isinstance(id, ReportIdentifier) else id results = self.api_client.get_matcher_results(group_id=self.group, report_id=report_id) return [MatcherResult(**result) for result in results] def get_associator_results(self, id: Union[ReportIdentifier, int]) -> AssociatorResult: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. return: futureexpert.forecast_consistency.ConsistentForecastResult """ if self.get_report_type(report_identifier=id) != 'associator': raise ValueError('The given report ID does not belong to an ASSOCIATOR result. ' + 'Please input a different ID.') report_id = id.report_id if isinstance(id, ReportIdentifier) else id result = self.api_client.get_associator_results(group_id=self.group, report_id=report_id) associator_result = result[0] associator_result['input'] = self.api_client.get_ts_data(self.group, associator_result.get('actuals')) associator_result.pop('actuals') return AssociatorResult(**associator_result) def get_ts_versions(self, skip: int = 0, limit: int = 100) -> pd.DataFrame: """Gets the available time series version, ordered from newest to oldest. keep_until_utc shows the last day where the data is stored. Parameters ---------- skip: builtins.int The number of initial elements of the version list to skip: builtins.int limit: builtins.int The limit on the length of the versjion list Returns ------- Overview of the available time series versions. return: pandas.core.frame.DataFrame """ results = self.api_client.get_group_ts_versions(self.group, skip, limit) transformed_results = [] for version in results: transformed_results.append(TimeSeriesVersion( version_id=version['_id'], description=version.get('description', None), creation_time_utc=version.get('creation_time_utc', None), keep_until_utc=version['customer_specific'].get('keep_until_utc', None) )) transformed_results.sort(key=lambda x: x.creation_time_utc, reverse=True) return pd.DataFrame([res.model_dump() for res in transformed_results]) def start_forecast_from_raw_data(self, raw_data_source: Union[pd.DataFrame, str], config_fc: ReportConfig, data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier: """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation. Parameters ---------- raw_data_source: typing.Union[pandas.core.frame.DataFrame, builtins.str] A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored. config_fc: futureexpert.forecast.ReportConfig The configuration of the forecast run. data_definition: typing.Optional[futureexpert.checkin.DataDefinition] Specifies the data, value and group columns and which rows and columns should be removed. config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig] Defines filter and aggreagtion level of the time series. config_checkin: typing.Optional[builtins.str] Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. Returns ------- The identifier of the forecasting report. return: futureexpert.expert_client.ReportIdentifier """ assert config_fc.rerun_report_id is None, 'start_forecast_from_raw_data can not be used with rerun_report_id.' upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification) user_input_id = upload_feedback['uuid'] file_id = upload_feedback['files'][0]['uuid'] res2 = self.create_time_series(user_input_id=user_input_id, file_uuid=file_id, data_definition=data_definition, config_ts_creation=config_ts_creation, config_checkin=config_checkin, file_specification=file_specification) version = res2['result']['tsVersion'] return self.start_forecast(version=version, config=config_fc) def start_matcher(self, config: MatcherConfig) -> ReportIdentifier: """Starts a covariate matcher report. Parameters ---------- version ID of a time series version config: futureexpert.matcher.MatcherConfig Configuration of the covariate matcher report. Returns ------- The identifier of the covariate matcher report. return: futureexpert.expert_client.ReportIdentifier """ version_data = self.api_client.get_ts_version(self.group, config.actuals_version) config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len, granularity=version_data['customer_specific']['granularity']) if not self.is_analyst and config.db_name is not None: raise ValueError('Only users with the role analyst are allowed to use the parameter db_name.') payload = self._create_matcher_payload(config) result = self.api_client.execute_action(group_id=self.group, core_id=self.matcher_core_id, payload=payload, interval_status_check_in_seconds=2) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Matching indicators...') return report def _create_matcher_payload(self, config: MatcherConfig) -> Any: """Converts the MatcherConfig into the payload needed for the cov-selection core. Parameters ---------- config: futureexpert.matcher.MatcherConfig return: typing.Any """ all_covs_versions = config.covs_versions if config.pool_covs is not None: pool_covs_checkin_result = self.check_in_pool_covs(requested_pool_covs=config.pool_covs) all_covs_versions.append(pool_covs_checkin_result.version_id) base_report_requested_run_status = ['Successful'] if 'NoEvaluation' not in config.rerun_status: base_report_requested_run_status.append('NoEvaluation') config_dict: dict[str, Any] = { 'report_description': config.title, 'db_name': config.db_name, 'data_config': { 'actuals_version': config.actuals_version, 'actuals_filter': config.actuals_filter, 'covs_versions': all_covs_versions, 'covs_filter': config.covs_filter, }, "compute_config": { "evaluation_start_date": config.evaluation_start_date, "evaluation_end_date": config.evaluation_end_date, 'max_ts_len': config.max_ts_len, "base_report_id": config.rerun_report_id, "base_report_requested_run_status": base_report_requested_run_status, "report_update_strategy": 'KEEP_OWN_RUNS', "cov_names": { 'cov_name_prefix': '', 'cov_name_field': 'name', 'cov_name_suffix': '', }, "preselection": { "num_obs_short_term_class": 36, "max_publication_lag": config.max_publication_lag, }, "postselection": { "num_obs_short_term_correlation": 60, "associator_report_id": config.associator_report_id, "use_clustering_results": config.use_clustering_results, "post_selection_queries": config.post_selection_queries, "post_selection_concatenation_operator": "&", "protected_selections_queries": [], "protected_selections_concatenation_operator": "&" }, "enable_leading_covariate_selection": config.enable_leading_covariate_selection, "fixed_season_length": config.fixed_season_length, "lag_selection": { "fixed_lags": config.lag_selection.fixed_lags, "min_lag": config.lag_selection.min_lag, "max_lag": config.lag_selection.max_lag, } } } return {'payload': config_dict}FutureEXPERT client.
Initializer.
Login using either your user credentials or a valid refresh token.
Parameters
user- The username for the future platform. If not provided, the username is read from environment variable FUTURE_USER.
password- The password for the future platform. If not provided, the password is read from environment variable FUTURE_PW.
totp- Optional second factor for authentication using user credentials.
refresh_token- Alternative login using a refresh token only instead of user credentials.
You can retrieve a long-lived refresh token (offline token) from our identity provider
using Open ID Connect scope
offline_accessat the token endpoint. Example: curl -s -X POST 'https://future-auth.prognostica.de/realms/future/protocol/openid-connect/token' -H 'Content-Type: application/x-www-form-urlencoded' –data-urlencode 'client_id=expert' –data-urlencode 'grant_type=password' –data-urlencode 'scope=openid offline_access' –data-urlencode "username=$FUTURE_USER" –data-urlencode "password=$FUTURE_PW" | jq .refresh_token group- Optionally the name of the futureEXPERT group. Only relevant if the user has access to multiple groups. If not provided, the group is read from the environment variable FUTURE_GROUP.
environment- Optionally the future environment to be used, defaults to production environment. If not provided, the environment is read from the environment variable FUTURE_ENVIRONMENT.
Static methods
def from_dotenv() ‑> ExpertClient-
Expand source code
@staticmethod def from_dotenv() -> ExpertClient: """Create an instance from a .env file or environment variables. Parameters ---------- return: futureexpert.expert_client.ExpertClient """ dotenv.load_dotenv() return ExpertClient()
Methods
def check_data_definition(self,
user_input_id: str,
file_uuid: str,
data_definition: DataDefinition,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any-
Expand source code
def check_data_definition(self, user_input_id: str, file_uuid: str, data_definition: DataDefinition, file_specification: FileSpecification = FileSpecification()) -> Any: """Checks the data definition. Removes specified rows and columns. Checks if column values have any issues. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: futureexpert.checkin.DataDefinition Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. return: typing.Any """ payload = self._create_checkin_payload_1( user_input_id, file_uuid, data_definition, file_specification) logger.info('Started data definition using CHECK-IN...') result = self.api_client.execute_action(group_id=self.group, core_id='checkin-preprocessing', payload=payload, interval_status_check_in_seconds=2) error_message = result['error'] if error_message != '': raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}') logger.info('Finished data definition.') return resultChecks the data definition.
Removes specified rows and columns. Checks if column values have any issues.
Parameters
user_input_id:builtins.str- UUID of the user input.
file_uuid:builtins.str- UUID of the file.
data_definition:DataDefinition- Specifies the data, value and group columns and which rows and columns are to be removed first.
file_specification:FileSpecification- Needed if a CSV is used with e.g. German format.
return:typing.Any
def check_in_pool_covs(self,
requested_pool_covs: list[PoolCovDefinition],
description: Optional[str] = None) ‑> CheckInPoolResult-
Expand source code
def check_in_pool_covs(self, requested_pool_covs: list[PoolCovDefinition], description: Optional[str] = None) -> CheckInPoolResult: """Create a new version from a list of pool covariates and version ids. Parameters ---------- requested_pool_covs: builtins.list[futureexpert.pool.PoolCovDefinition] List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id. If no version id is provided, the newest version of the covariate is used. description: typing.Optional[builtins.str] A short description of the selected covariates. Returns ------- Result object with fields version_id and pool_cov_information. return: futureexpert.pool.CheckInPoolResult """ logger.info('Transforming input data...') payload: dict[str, Any] = { 'payload': { 'requested_indicators': [ {**covariate.model_dump(exclude_none=True), 'indicator_id': covariate.pool_cov_id} for covariate in requested_pool_covs ] } } for covariate in payload['payload']['requested_indicators']: covariate.pop('pool_cov_id', None) payload['payload']['version_description'] = description logger.info('Creating time series using checkin-pool...') result = self.api_client.execute_action(group_id=self.group, core_id='checkin-pool', payload=payload, interval_status_check_in_seconds=2) logger.info('Finished time series creation.') return CheckInPoolResult(**result['result'])Create a new version from a list of pool covariates and version ids.
Parameters
requested_pool_covs:builtins.list[PoolCovDefinition]- List of pool covariate definitions. Each definition consists of an pool_cov_id and an optional version_id. If no version id is provided, the newest version of the covariate is used.
description:typing.Optional[builtins.str]- A short description of the selected covariates.
Returns
- Result object with fields version_id and pool_cov_information.
return:CheckInPoolResult
def check_in_time_series(self,
raw_data_source: Union[pd.DataFrame, str],
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> str-
Expand source code
def check_in_time_series(self, raw_data_source: Union[pd.DataFrame, str], data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> str: """Checks in time series data that can be used as actuals or covariate data. Parameters ---------- raw_data_source: typing.Union[pandas.core.frame.DataFrame, builtins.str] Data frame that contains the raw data or path to where the CSV file with the data is stored. data_definition: typing.Optional[futureexpert.checkin.DataDefinition] Specifies the data, value and group columns and which rows and columns are to be removed. config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig] Defines filter and aggreagtion level of the time series. config_checkin: typing.Optional[builtins.str] Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. Returns ------- Id of the time series version. Used to identifiy the time series. return: builtins.str """ upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification) user_input_id = upload_feedback['uuid'] file_id = upload_feedback['files'][0]['uuid'] response = self.create_time_series(user_input_id=user_input_id, file_uuid=file_id, data_definition=data_definition, config_ts_creation=config_ts_creation, config_checkin=config_checkin, file_specification=file_specification) return str(response['result']['tsVersion'])Checks in time series data that can be used as actuals or covariate data.
Parameters
raw_data_source:typing.Union[pandas.core.frame.DataFrame, builtins.str]- Data frame that contains the raw data or path to where the CSV file with the data is stored.
data_definition:typing.Optional[DataDefinition]- Specifies the data, value and group columns and which rows and columns are to be removed.
config_ts_creation:typing.Optional[TsCreationConfig]- Defines filter and aggreagtion level of the time series.
config_checkin:typing.Optional[builtins.str]- Path to the JSON file with the CHECK-IN configuration.
config_ts_creationandconfig_checkincannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification:FileSpecification- Needed if a CSV is used with e.g. German format.
Returns
- Id of the time series version. Used to identifiy the time series.
return:builtins.str
def create_time_series(self,
user_input_id: str,
file_uuid: str,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any-
Expand source code
def create_time_series(self, user_input_id: str, file_uuid: str, data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> Any: """Last step of the CHECK-IN process which creates the time series. Aggregates the data and saves them to the database. Parameters ---------- user_input_id: builtins.str UUID of the user input. file_uuid: builtins.str UUID of the file. data_definition: typing.Optional[futureexpert.checkin.DataDefinition] Specifies the data, value and group columns and which rows and columns are to be removed first. file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig] Configuration for the time series creation. config_checkin: typing.Optional[builtins.str] Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the _future_ frontend (now.future-forecasting.de). return: typing.Any """ logger.info('Transforming input data...') if config_ts_creation is None and config_checkin is None: raise ValueError('No configuration source is provided.') if config_ts_creation is not None and config_checkin is not None: raise ValueError('Only one configuration source can be processed.') if config_checkin is None and (data_definition is None or config_ts_creation is None): raise ValueError( 'For checkin configuration via python `data_defintion`and `config_ts_cration` must be provided.') if config_ts_creation is not None and data_definition is not None: payload_1 = self._create_checkin_payload_1( user_input_id, file_uuid, data_definition, file_specification) payload = self._create_checkin_payload_2(payload_1, config_ts_creation) if config_checkin is not None: payload = self._build_payload_from_ui_config( user_input_id=user_input_id, file_uuid=file_uuid, path=config_checkin) logger.info('Creating time series using CHECK-IN...') result = self.api_client.execute_action(group_id=self.group, core_id='checkin-preprocessing', payload=payload, interval_status_check_in_seconds=2) error_message = result['error'] if error_message != '': raise RuntimeError(f'Error during the execution of CHECK-IN: {error_message}') logger.info('Finished time series creation.') return resultLast step of the CHECK-IN process which creates the time series.
Aggregates the data and saves them to the database.
Parameters
user_input_id:builtins.str- UUID of the user input.
file_uuid:builtins.str- UUID of the file.
data_definition:typing.Optional[DataDefinition]- Specifies the data, value and group columns and which rows and columns are to be removed first.
file_specification:FileSpecification- Needed if a CSV is used with e.g. German format.
config_ts_creation:typing.Optional[TsCreationConfig]- Configuration for the time series creation.
config_checkin:typing.Optional[builtins.str]- Path to the JSON file with the CHECK-IN configuration.
config_ts_creationandconfig_checkincannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). return:typing.Any
def get_associator_results(self, id: Union[ReportIdentifier, int]) ‑> AssociatorResult-
Expand source code
def get_associator_results(self, id: Union[ReportIdentifier, int]) -> AssociatorResult: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. return: futureexpert.forecast_consistency.ConsistentForecastResult """ if self.get_report_type(report_identifier=id) != 'associator': raise ValueError('The given report ID does not belong to an ASSOCIATOR result. ' + 'Please input a different ID.') report_id = id.report_id if isinstance(id, ReportIdentifier) else id result = self.api_client.get_associator_results(group_id=self.group, report_id=report_id) associator_result = result[0] associator_result['input'] = self.api_client.get_ts_data(self.group, associator_result.get('actuals')) associator_result.pop('actuals') return AssociatorResult(**associator_result)Gets the results from the given report.
Parameters
id:typing.Union[ReportIdentifier, builtins.int]- Report identifier or plain report ID.
return:ConsistentForecastResult
def get_consistent_forecast_results(self, id: Union[ReportIdentifier, int]) ‑> ConsistentForecastResult-
Expand source code
def get_consistent_forecast_results(self, id: Union[ReportIdentifier, int] ) -> ConsistentForecastResult: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. return: futureexpert.forecast_consistency.ConsistentForecastResult """ if self.get_report_type(report_identifier=id) != 'hierarchical-forecast': raise ValueError('The given report ID does not belong to a reconciled forecast result. ' + 'Please input a different ID.') report_id = id.report_id if isinstance(id, ReportIdentifier) else id results = self.api_client.get_hierarchical_fc_results(group_id=self.group, report_id=report_id) return ConsistentForecastResult(**results)Gets the results from the given report.
Parameters
id:typing.Union[ReportIdentifier, builtins.int]- Report identifier or plain report ID.
return:ConsistentForecastResult
def get_fc_results(self,
id: Union[ReportIdentifier, int],
include_k_best_models: int = 1,
include_backtesting: bool = False,
include_discarded_models: bool = False) ‑> list[ForecastResult]-
Expand source code
def get_fc_results(self, id: Union[ReportIdentifier, int], include_k_best_models: int = 1, include_backtesting: bool = False, include_discarded_models: bool = False) -> list[ForecastResult]: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Forecast identifier or plain report ID. include_k_best_models: builtins.int Number of k best models for which results are to be returned. include_backtesting: builtins.bool Determines whether backtesting results are to be returned. include_discarded_models: builtins.bool Determines if models excluded from ranking should be included in the result. return: builtins.list[futureexpert.forecast.ForecastResult] """ if include_k_best_models < 1: raise ValueError('At least one model is needed.') if self.get_report_type(report_identifier=id) not in ['forecast', 'MongoForecastingResultSink']: raise ValueError('The given report ID does not belong to a FORECAST result. ' + 'Please input a different ID or use get_matcher_results().') report_id = id.report_id if isinstance(id, ReportIdentifier) else id results = self.api_client.get_fc_results(group_id=self.group, report_id=report_id, include_k_best_models=include_k_best_models, include_backtesting=include_backtesting, include_discarded_models=include_discarded_models) return [ForecastResult(**result) for result in results]Gets the results from the given report.
Parameters
id:typing.Union[ReportIdentifier, builtins.int]- Forecast identifier or plain report ID.
include_k_best_models:builtins.int- Number of k best models for which results are to be returned.
include_backtesting:builtins.bool- Determines whether backtesting results are to be returned.
include_discarded_models:builtins.bool- Determines if models excluded from ranking should be included in the result.
return:builtins.list[ForecastResult]
def get_matcher_results(self, id: Union[ReportIdentifier, int]) ‑> list[MatcherResult]-
Expand source code
def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> list[MatcherResult]: """Gets the results from the given report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. return: futureexpert.forecast_consistency.ConsistentForecastResult """ if self.get_report_type(report_identifier=id) not in ['matcher', 'CovariateSelection']: raise ValueError('The given report ID does not belong to a MATCHER result. ' + 'Please input a different ID or use get_fc_results().') report_id = id.report_id if isinstance(id, ReportIdentifier) else id results = self.api_client.get_matcher_results(group_id=self.group, report_id=report_id) return [MatcherResult(**result) for result in results]Gets the results from the given report.
Parameters
id:typing.Union[ReportIdentifier, builtins.int]- Report identifier or plain report ID.
return:ConsistentForecastResult
def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) ‑> PoolCovOverview-
Expand source code
def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) -> PoolCovOverview: """Gets an overview of all covariates available on POOL according to the given filters. Parameters ---------- granularity: typing.Optional[builtins.str] If set, returns only data matching that granularity (Day or Month). search: typing.Optional[builtins.str] If set, performs a full-text search and only returns data found in that search. Returns ------- PoolCovOverview object with tables containing the covariates with different levels of detail . return: futureexpert.pool.PoolCovOverview """ response_json = self.api_client.get_pool_cov_overview(granularity=granularity, search=search) return PoolCovOverview(response_json)Gets an overview of all covariates available on POOL according to the given filters.
Parameters
granularity:typing.Optional[builtins.str]- If set, returns only data matching that granularity (Day or Month).
search:typing.Optional[builtins.str]- If set, performs a full-text search and only returns data found in that search.
Returns
PoolCovOverview object with tables containing the covariates with- different levels of detail .
return:PoolCovOverview
def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) ‑> ReportStatus-
Expand source code
def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus: """Gets the current status of a forecast or matcher report. Parameters ---------- id: typing.Union[futureexpert.expert_client.ReportIdentifier, builtins.int] Report identifier or plain report ID. include_error_reason: builtins.bool Determines whether log messages are to be included in the result. return: futureexpert.expert_client.ReportStatus """ fc_identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None) raw_result = self.api_client.get_report_status(group_id=self.group, report_id=fc_identifier.report_id, include_error_reason=include_error_reason) report_status = raw_result['status_summary'] created = report_status.get('Created', 0) successful = report_status.get('Successful', 0) noeval = report_status.get('NoEvaluation', 0) error = report_status.get('Error', 0) summary = ReportStatusProgress(requested=created, pending=created - successful - noeval - error, finished=successful + noeval + error) results = ReportStatusResults(successful=successful, no_evaluation=noeval, error=error) customer_specific = raw_result.get('customer_specific', None) assert (customer_specific is None or isinstance(customer_specific, dict)), 'unexpected type of customer_specific property' return ReportStatus(id=fc_identifier, progress=summary, results=results, error_reasons=None if customer_specific is None else customer_specific.get('log_messages', None))Gets the current status of a forecast or matcher report.
Parameters
id:typing.Union[ReportIdentifier, builtins.int]- Report identifier or plain report ID.
include_error_reason:builtins.bool- Determines whether log messages are to be included in the result.
return:ReportStatus
def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) ‑> str-
Expand source code
def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) -> str: """Gets the available reports, ordered from newest to oldest. Parameters ---------- skip The number of initial elements of the report list to skip limit The limit on the length of the report list Returns ------- String representation of the type of one report. report_identifier: typing.Union[builtins.int, futureexpert.expert_client.ReportIdentifier] return: builtins.str """ report_id = report_identifier.report_id if isinstance( report_identifier, ReportIdentifier) else report_identifier return self.api_client.get_report_type(group_id=self.group, report_id=report_id)Gets the available reports, ordered from newest to oldest.
Parameters ---------- skip The number of initial elements of the report list to skip limit The limit on the length of the report list Returns ------- String representation of the type of one report. report_identifier: typing.Union[builtins.int, futureexpert.expert_client.ReportIdentifier]return: builtins.str
def get_reports(self, skip: int = 0, limit: int = 100) ‑> pandas.core.frame.DataFrame-
Expand source code
def get_reports(self, skip: int = 0, limit: int = 100) -> pd.DataFrame: """Gets the available reports, ordered from newest to oldest. Parameters ---------- skip: builtins.int The number of initial elements of the report list to skip: builtins.int limit: builtins.int The limit on the length of the report list Returns ------- The available reports from newest to oldest. return: pandas.core.frame.DataFrame """ group_reports = self.api_client.get_group_reports(group_id=self.group, skip=skip, limit=limit) vallidated_report_summarys = [ReportSummary.model_validate(report) for report in group_reports] return pd.DataFrame([report_summary.model_dump() for report_summary in vallidated_report_summarys])Gets the available reports, ordered from newest to oldest.
Parameters
skip:builtins.int- The number of initial elements of the report list to skip: builtins.int
limit:builtins.int- The limit on the length of the report list
Returns
- The available reports from newest to oldest.
return:pandas.core.frame.DataFrame
def get_time_series(self, version_id: str) ‑> CheckInResult-
Expand source code
def get_time_series(self, version_id: str) -> CheckInResult: """Get time series data. From previously checked-in data. Parameters --------- version_id: builtins.str Id of the time series version. Returns ------- Id of the time series version. Used to identifiy the time series and the values of the time series. return: futureexpert.checkin.CheckInResult """ result = self.api_client.get_ts_data(self.group, version_id) return CheckInResult(time_series=[TimeSeries(**ts) for ts in result], version_id=version_id)Get time series data. From previously checked-in data.
Parameters
version_id:builtins.str- Id of the time series version.
Returns
- Id of the time series version. Used to identifiy the time series and the values of the time series.
return:CheckInResult
def get_ts_versions(self, skip: int = 0, limit: int = 100) ‑> pandas.core.frame.DataFrame-
Expand source code
def get_ts_versions(self, skip: int = 0, limit: int = 100) -> pd.DataFrame: """Gets the available time series version, ordered from newest to oldest. keep_until_utc shows the last day where the data is stored. Parameters ---------- skip: builtins.int The number of initial elements of the version list to skip: builtins.int limit: builtins.int The limit on the length of the versjion list Returns ------- Overview of the available time series versions. return: pandas.core.frame.DataFrame """ results = self.api_client.get_group_ts_versions(self.group, skip, limit) transformed_results = [] for version in results: transformed_results.append(TimeSeriesVersion( version_id=version['_id'], description=version.get('description', None), creation_time_utc=version.get('creation_time_utc', None), keep_until_utc=version['customer_specific'].get('keep_until_utc', None) )) transformed_results.sort(key=lambda x: x.creation_time_utc, reverse=True) return pd.DataFrame([res.model_dump() for res in transformed_results])Gets the available time series version, ordered from newest to oldest. keep_until_utc shows the last day where the data is stored.
Parameters
skip:builtins.int- The number of initial elements of the version list to skip: builtins.int
limit:builtins.int- The limit on the length of the versjion list
Returns
- Overview of the available time series versions.
return:pandas.core.frame.DataFrame
def start_associator(self,
config: AssociatorConfig) ‑> ReportIdentifier-
Expand source code
def start_associator(self, config: AssociatorConfig) -> ReportIdentifier: """Sarts an associator report. Parameters ---------- config: futureexpert.associator.AssociatorConfig Configuration of the associator run. Returns ------- The identifier of the associator report. return: futureexpert.expert_client.ReportIdentifier """ config_dict = config.model_dump() payload = {'payload': config_dict} result = self.api_client.execute_action(group_id=self.group, core_id=self.associator_core_id, payload=payload, interval_status_check_in_seconds=2) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Associator finished') return reportSarts an associator report.
Parameters
config:AssociatorConfig- Configuration of the associator run.
Returns
- The identifier of the associator report.
return:ReportIdentifier
def start_forecast(self,
version: str,
config: ReportConfig) ‑> ReportIdentifier-
Expand source code
def start_forecast(self, version: str, config: ReportConfig) -> ReportIdentifier: """Starts a forecasting report. Parameters ---------- version: builtins.str ID of a time series version. config: futureexpert.forecast.ReportConfig Configuration of the forecasting report. Returns ------- The identifier of the forecasting report. return: futureexpert.expert_client.ReportIdentifier """ version_data = self.api_client.get_ts_version(self.group, version) config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len, granularity=version_data['customer_specific']['granularity']) logger.info('Preparing data for forecast...') if not self.is_analyst and (config.db_name is not None or config.priority is not None): raise ValueError('Only users with the role analyst are allowed to use the parameters db_name and priority.') payload = self._create_forecast_payload(version, config) logger.info('Finished data preparation for forecast.') logger.info('Started creating forecasting report with FORECAST...') result = self.api_client.execute_action(group_id=self.group, core_id=self.forecast_core_id, payload=payload, interval_status_check_in_seconds=2) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Forecasts are running...') return reportStarts a forecasting report.
Parameters
version:builtins.str- ID of a time series version.
config:ReportConfig- Configuration of the forecasting report.
Returns
- The identifier of the forecasting report.
return:ReportIdentifier
def start_forecast_from_raw_data(self,
raw_data_source: Union[pd.DataFrame, str],
config_fc: ReportConfig,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> ReportIdentifier-
Expand source code
def start_forecast_from_raw_data(self, raw_data_source: Union[pd.DataFrame, str], config_fc: ReportConfig, data_definition: Optional[DataDefinition] = None, config_ts_creation: Optional[TsCreationConfig] = None, config_checkin: Optional[str] = None, file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier: """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation. Parameters ---------- raw_data_source: typing.Union[pandas.core.frame.DataFrame, builtins.str] A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored. config_fc: futureexpert.forecast.ReportConfig The configuration of the forecast run. data_definition: typing.Optional[futureexpert.checkin.DataDefinition] Specifies the data, value and group columns and which rows and columns should be removed. config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig] Defines filter and aggreagtion level of the time series. config_checkin: typing.Optional[builtins.str] Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin` cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification: futureexpert.checkin.FileSpecification Needed if a CSV is used with e.g. German format. Returns ------- The identifier of the forecasting report. return: futureexpert.expert_client.ReportIdentifier """ assert config_fc.rerun_report_id is None, 'start_forecast_from_raw_data can not be used with rerun_report_id.' upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification) user_input_id = upload_feedback['uuid'] file_id = upload_feedback['files'][0]['uuid'] res2 = self.create_time_series(user_input_id=user_input_id, file_uuid=file_id, data_definition=data_definition, config_ts_creation=config_ts_creation, config_checkin=config_checkin, file_specification=file_specification) version = res2['result']['tsVersion'] return self.start_forecast(version=version, config=config_fc)Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.
Parameters
raw_data_source:typing.Union[pandas.core.frame.DataFrame, builtins.str]- A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
config_fc:ReportConfig- The configuration of the forecast run.
data_definition:typing.Optional[DataDefinition]- Specifies the data, value and group columns and which rows and columns should be removed.
config_ts_creation:typing.Optional[TsCreationConfig]- Defines filter and aggreagtion level of the time series.
config_checkin:typing.Optional[builtins.str]- Path to the JSON file with the CHECK-IN configuration.
config_ts_creationandconfig_checkincannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de). file_specification:FileSpecification- Needed if a CSV is used with e.g. German format.
Returns
- The identifier of the forecasting report.
return:ReportIdentifier
def start_making_forecast_consistent(self,
config: MakeForecastConsistentConfiguration) ‑> ReportIdentifier-
Expand source code
def start_making_forecast_consistent(self, config: MakeForecastConsistentConfiguration) -> ReportIdentifier: """Starts process of making forecasts hierarchically consistent. Parameters ---------- config: futureexpert.forecast_consistency.MakeForecastConsistentConfiguration Configuration of the make forecast consistent run. Returns ------- The identifier of the forecasting report. return: futureexpert.expert_client.ReportIdentifier """ logger.info('Preparing data for forecast consistency...') if not self.is_analyst and (config.db_name is not None): raise ValueError('Only users with the role analyst are allowed to use the parameters db_name.') payload = self._create_reconciliation_payload(config) logger.info('Finished data preparation for forecast consistency.') logger.info('Started creating hierarchical reconciliation for consistent forecasts...') result = self.api_client.execute_action(group_id=self.group, core_id=self.hcfc_core_id, payload=payload, interval_status_check_in_seconds=2) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Reconciliation is running...') return reportStarts process of making forecasts hierarchically consistent.
Parameters
config:MakeForecastConsistentConfiguration- Configuration of the make forecast consistent run.
Returns
- The identifier of the forecasting report.
return:ReportIdentifier
def start_matcher(self,
config: MatcherConfig) ‑> ReportIdentifier-
Expand source code
def start_matcher(self, config: MatcherConfig) -> ReportIdentifier: """Starts a covariate matcher report. Parameters ---------- version ID of a time series version config: futureexpert.matcher.MatcherConfig Configuration of the covariate matcher report. Returns ------- The identifier of the covariate matcher report. return: futureexpert.expert_client.ReportIdentifier """ version_data = self.api_client.get_ts_version(self.group, config.actuals_version) config.max_ts_len = calculate_max_ts_len(max_ts_len=config.max_ts_len, granularity=version_data['customer_specific']['granularity']) if not self.is_analyst and config.db_name is not None: raise ValueError('Only users with the role analyst are allowed to use the parameter db_name.') payload = self._create_matcher_payload(config) result = self.api_client.execute_action(group_id=self.group, core_id=self.matcher_core_id, payload=payload, interval_status_check_in_seconds=2) report = ReportIdentifier.model_validate(result) logger.info(f'Report created with ID {report.report_id}. Matching indicators...') return reportStarts a covariate matcher report.
Parameters
version- ID of a time series version
config:MatcherConfig- Configuration of the covariate matcher report.
Returns
- The identifier of the covariate matcher report.
return:ReportIdentifier
def switch_group(self, new_group: str, verbose: bool = True) ‑> None-
Expand source code
def switch_group(self, new_group: str, verbose: bool = True) -> None: """Switches the current group. Parameters ---------- new_group: builtins.str The name of the group to activate. verbose: builtins.bool If enabled, shows the group name in the log message. return: builtins.NoneType """ if new_group not in self.api_client.userinfo['groups']: raise RuntimeError(f'You are not authorized to access group {new_group}') self.group = new_group verbose_text = f' for group {self.group}' if verbose else '' logger.info(f'Successfully logged in{verbose_text}.')Switches the current group.
Parameters
new_group:builtins.str- The name of the group to activate.
verbose:builtins.bool- If enabled, shows the group name in the log message.
return:builtins.NoneType
def upload_data(self,
source: Union[pd.DataFrame, str],
file_specification: Optional[FileSpecification] = None) ‑> Any-
Expand source code
def upload_data(self, source: Union[pd.DataFrame, str], file_specification: Optional[FileSpecification] = None) -> Any: """Uploads the given raw data for further processing. Parameters ---------- source: typing.Union[pandas.core.frame.DataFrame, builtins.str] Path to a CSV file or a pandas data frame. file_specification: typing.Optional[futureexpert.checkin.FileSpecification] If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones. The parameter has no effect if source is a path to a CSV file. Returns ------- Identifier for the user Inputs. return: typing.Any """ df_file = None if isinstance(source, pd.DataFrame): if not file_specification: file_specification = FileSpecification() csv = source.to_csv(index=False, sep=file_specification.delimiter, decimal=file_specification.decimal, encoding='utf-8-sig') time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S') df_file = (f'expert-{time_stamp}.csv', csv) path = None else: path = source # TODO: currently only one file is supported here. upload_feedback = self.api_client.upload_user_inputs_for_group(self.group, path, df_file) return upload_feedbackUploads the given raw data for further processing.
Parameters
source:typing.Union[pandas.core.frame.DataFrame, builtins.str]- Path to a CSV file or a pandas data frame.
file_specification:typing.Optional[FileSpecification]- If source is a pandas data frame, it will be uploaded as a csv using the specified parameters or the default ones. The parameter has no effect if source is a path to a CSV file.
Returns
- Identifier for the user Inputs.
return:typing.Any
class FileSpecification (**data: Any)-
Expand source code
class FileSpecification(BaseConfig): """Specify the format of the CSV file. Parameters ---------- delimiter: typing.Optional[builtins.str] The delimiter used to separate values. decimal: typing.Optional[builtins.str] The decimal character used in decimal numbers. thousands: typing.Optional[builtins.str] The thousands separator used in numbers. """ delimiter: Optional[str] = ',' decimal: Optional[str] = '.' thousands: Optional[str] = NoneSpecify the format of the CSV file.
Parameters
delimiter:typing.Optional[builtins.str]- The delimiter used to separate values.
decimal:typing.Optional[builtins.str]- The decimal character used in decimal numbers.
thousands:typing.Optional[builtins.str]- The thousands separator used in numbers.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var decimal : str | Nonevar delimiter : str | Nonevar model_configvar thousands : str | None
class FilterSettings (**data: Any)-
Expand source code
class FilterSettings(BaseConfig): """Model for the filters. Parameters ---------- type: typing.Literal['exclusion', 'inclusion'] The type of filter: `exclusion` or `inclusion`. variable: builtins.str The columns name to be used for filtering. items: builtins.list[builtins.str] The list of values to be used for filtering. """ type: Literal['exclusion', 'inclusion'] variable: str items: list[str]Model for the filters.
Parameters
type:typing.Literal['exclusion', 'inclusion']- The type of filter:
exclusionorinclusion. variable:builtins.str- The columns name to be used for filtering.
items:builtins.list[builtins.str]- The list of values to be used for filtering.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var items : list[str]var model_configvar type : Literal['exclusion', 'inclusion']var variable : str
class ForecastingConfig (**data: Any)-
Expand source code
class ForecastingConfig(BaseConfig): """Forecasting configuration. Parameters ---------- fc_horizon Forecast horizon. round_forecast_to_integer If true, then forecasts are rounded to the nearest integer (also applied during backtesting). use_ensemble If true, then calculate ensemble forecasts. Automatically makes a smart decision on which methods to use based on their backtesting performance. lower_bound Lower bound applied to the time series and forecasts. upper_bound Upper bound applied to the time series and forecasts. confidence_level Confidence level for prediction intervals. skip_empirical_prediction_intervals If true, empirical prediction intervals for confidence levels are not calculated. This does not affect models that generate their own prediction intervals.\n\n Disabling this can affect model selection, as plausibility checks on the intervals are also omitted. Setting this to `True` also removes the minimum forecast horizon needed for the intervals, allowing for a shorter `fc_horizon` during backtesting when defined via `step_weights`. working_day_adaptions If present, enables optional working day adaptions of the time series and forecasts. This is currently not compatible with use_ensemble=True. """ fc_horizon: Annotated[ValidatedPositiveInt, pydantic.Field(ge=1, le=60)] round_forecast_to_integer: bool = False use_ensemble: bool = False lower_bound: Union[float, None] = None upper_bound: Union[float, None] = None confidence_level: float = 0.75 skip_empirical_prediction_intervals: bool = False working_day_adaptions: Optional[WorkingDayAdaptionsConfig] = None @property def numeric_bounds(self) -> tuple[float, float]: return ( self.lower_bound if self.lower_bound is not None else -np.inf, self.upper_bound if self.upper_bound is not None else np.inf, ) @pydantic.model_validator(mode='after') def ensemble_incompatible_with_working_days(self) -> Self: """Validator for combination of ensemble model and working day adaptions. Parameters ---------- return: typing.Self """ if self.use_ensemble and self.working_day_adaptions is not None: raise ValueError('use_ensemble and working_days cannot be used together.') return selfForecasting configuration.
Parameters
fc_horizon- Forecast horizon.
round_forecast_to_integer- If true, then forecasts are rounded to the nearest integer (also applied during backtesting).
use_ensemble- If true, then calculate ensemble forecasts. Automatically makes a smart decision on which methods to use based on their backtesting performance.
lower_bound- Lower bound applied to the time series and forecasts.
upper_bound- Upper bound applied to the time series and forecasts.
confidence_level- Confidence level for prediction intervals.
skip_empirical_prediction_intervals-
If true, empirical prediction intervals for confidence levels are not calculated. This does not affect models that generate their own prediction intervals.
Disabling this can affect model selection, as plausibility checks on the intervals are also omitted. Setting this to
Truealso removes the minimum forecast horizon needed for the intervals, allowing for a shorterfc_horizonduring backtesting when defined viastep_weights. working_day_adaptions- If present, enables optional working day adaptions of the time series and forecasts. This is currently not compatible with use_ensemble=True.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var confidence_level : floatvar fc_horizon : PositiveIntvar lower_bound : float | Nonevar model_configvar round_forecast_to_integer : boolvar skip_empirical_prediction_intervals : boolvar upper_bound : float | Nonevar use_ensemble : boolvar working_day_adaptions : WorkingDayAdaptionsConfig | None
Instance variables
prop numeric_bounds : tuple[float, float]-
Expand source code
@property def numeric_bounds(self) -> tuple[float, float]: return ( self.lower_bound if self.lower_bound is not None else -np.inf, self.upper_bound if self.upper_bound is not None else np.inf, )
Methods
def ensemble_incompatible_with_working_days(self) ‑> Self-
Expand source code
@pydantic.model_validator(mode='after') def ensemble_incompatible_with_working_days(self) -> Self: """Validator for combination of ensemble model and working day adaptions. Parameters ---------- return: typing.Self """ if self.use_ensemble and self.working_day_adaptions is not None: raise ValueError('use_ensemble and working_days cannot be used together.') return selfValidator for combination of ensemble model and working day adaptions.
Parameters
return:typing.Self
class LagSelectionConfig (**data: Any)-
Expand source code
class LagSelectionConfig(BaseModel): """Configures covariate lag selection. Parameters ---------- fixed_lags: typing.Optional[builtins.list[builtins.int]] Lags that are tested in the lag selection. min_lag: typing.Optional[builtins.int] Minimal lag that is tested in the lag selection. For example, a lag 3 means the covariate is shifted 3 data points into the future. max_lag: typing.Optional[builtins.int] Maximal lag that is tested in the lag selection. For example, a lag 12 means the covariate is shifted 12 data points into the future. """ min_lag: Optional[int] = None max_lag: Optional[int] = None fixed_lags: Optional[list[int]] = None @model_validator(mode='after') def _check_range(self) -> Self: if (self.min_lag is None) ^ (self.max_lag is None): raise ValueError( 'If one of `min_lag` and `max_lag` is set the other one also needs to be set.') if self.min_lag and self.max_lag: if self.fixed_lags is not None: raise ValueError('Fixed lags and min/max lag are mutually exclusive.') if self.max_lag < self.min_lag: raise ValueError('max_lag needs to be greater or equal to min_lag.') lag_range = abs(self.max_lag - self.min_lag) + 1 if lag_range > 15: raise ValueError(f'Only 15 lags are allowed to be tested. The requested range has length {lag_range}.') if self.fixed_lags and len(self.fixed_lags) > 15: raise ValueError( f'Only 15 lags are allowed to be tested. The provided fixed lags has length {len(self.fixed_lags)}.') return selfConfigures covariate lag selection.
Parameters
fixed_lags:typing.Optional[builtins.list[builtins.int]]- Lags that are tested in the lag selection.
min_lag:typing.Optional[builtins.int]- Minimal lag that is tested in the lag selection. For example, a lag 3 means the covariate is shifted 3 data points into the future.
max_lag:typing.Optional[builtins.int]- Maximal lag that is tested in the lag selection. For example, a lag 12 means the covariate is shifted 12 data points into the future.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var fixed_lags : list[int] | Nonevar max_lag : int | Nonevar min_lag : int | Nonevar model_config
class MakeForecastConsistentConfiguration (**data: Any)-
Expand source code
class MakeForecastConsistentConfiguration(BaseConfig): """Service configuration. Parameters ---------- data_selection: futureexpert.forecast_consistency.MakeForecastConsistentDataSelection Configuration on the selection of time series and forecasts used for carrying out the reconciliation. report_note: builtins.str Note of the report. db_name: typing.Optional[builtins.str] Only accessible for internal use. Name of the database to use for storing the results. reconciliation: typing.Optional[futureexpert.forecast_consistency.ReconciliationConfig] Optional reconciliation configuration. If not provided, defaults will be used. """ data_selection: MakeForecastConsistentDataSelection report_note: str db_name: Optional[str] = None reconciliation: Optional[ReconciliationConfig] = NoneService configuration.
Parameters
data_selection:MakeForecastConsistentDataSelection- Configuration on the selection of time series and forecasts used for carrying out the reconciliation.
report_note:builtins.str- Note of the report.
db_name:typing.Optional[builtins.str]- Only accessible for internal use. Name of the database to use for storing the results.
reconciliation:typing.Optional[ReconciliationConfig]- Optional reconciliation configuration. If not provided, defaults will be used.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var data_selection : MakeForecastConsistentDataSelectionvar db_name : str | Nonevar model_configvar reconciliation : ReconciliationConfig | Nonevar report_note : str
class MakeForecastConsistentDataSelection (**data: Any)-
Expand source code
class MakeForecastConsistentDataSelection(BaseConfig): """Forecast and time series selection for making forecast consistent. Parameters ---------- version: builtins.str Time series version to be used. fc_report_id: builtins.int The identifier of the forecasting report to be used. """ version: str fc_report_id: intForecast and time series selection for making forecast consistent.
Parameters
version:builtins.str- Time series version to be used.
fc_report_id:builtins.int- The identifier of the forecasting report to be used.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var fc_report_id : intvar model_configvar version : str
class MatcherConfig (**data: Any)-
Expand source code
class MatcherConfig(BaseConfig): """Configuration for a MATCHER run. Parameters ---------- title: builtins.str A short description of the report. actuals_version: builtins.str The version ID of the actuals. covs_versions: builtins.list[builtins.str] List of versions of the covariates. actuals_filter: builtins.dict[builtins.str, typing.Any] Filter criterion for actuals time series. The given actuals version is automatically added as additional filter criterion. Possible Filter criteria are all fields that are part of the TimeSeries class. e.g. {'name': 'Sales'} For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors covs_filter: builtins.dict[builtins.str, typing.Any] Filter criterion for covariates time series. The given covariate version is automatically added as additional filter criterion. Possible Filter criteria are all fields that are part of the TimeSeries class. e.g. {'name': 'Sales'} For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors max_ts_len: typing.Optional[builtins.int] At most this number of most recent observations of the actuals time series is used. Check the variable MAX_TS_LEN_CONFIG for allowed configuration. lag_selection: futureexpert.matcher.LagSelectionConfig Configuration of covariate lag selection. evaluation_start_date: typing.Optional[builtins.str] Optional start date for the evaluation. The input should be in the ISO format with date and time, "YYYY-mm-DDTHH-MM-SS", e.g., "2024-01-01T16:40:00". Actuals and covariate observations prior to this start date are dropped. evaluation_end_date: typing.Optional[builtins.str] Optional end date for the evaluation. The input should be in the ISO format with date and time, "YYYY-mm-DDTHH-MM-SS", e.g., "2024-01-01T16:40:00". Actuals and covariate observations after this end date are dropped. max_publication_lag: builtins.int Maximal publication lag for the covariates. The publication lag of a covariate is the number of most recent observations (compared to the actuals) that are missing for the covariate. E.g., if the actuals (for monthly granularity) end in April 2023 but the covariate ends in February 2023, the covariate has a publication lag of 2. associator_report_id: typing.Optional[builtins.int] Optional report id of clustering results. If None, the database is searched for a fitting clustering. The clustering results are used in the post-selection. If there are too many selected behind this is that they all would give similar results in forecasting. Only used if `use_clustering_results` is true. use_clustering_results: builtins.bool If true clustering results are used. post_selection_queries: builtins.list[builtins.str] List of queries that are executed on the ranking summary DataFrame. Only ranking entries that match the queries are kept. The query strings need to satisfy the pandas query syntax (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html). Here are the columns of the ranking summary DataFrame that you might want to filter on: Column Name | Data Type | Description ----------------------------------------------------------------------------------------------- Lag | Int64 | Lag of the covariate. Rank | float64 | Rank of the model. BetterThanNoCov | bool | Indicates whether the model is better than the non-cov model. enable_leading_covariate_selection: builtins.bool When True, all covariates after the lag is applied that do not have at least one more datapoint beyond the the time period covered by actuals are removed from the candidate covariates passed to covariate selection. fixed_season_length: typing.Optional[builtins.int] An optional parameter specifying the length of a season in the dataset. pool_covs: typing.Optional[builtins.list[futureexpert.pool.PoolCovDefinition]] List of covariate definitions. db_name: typing.Optional[builtins.str] Only accessible for internal use. Name of the database to use for storing the results. rerun_report_id: typing.Optional[builtins.int] ReportId from which failed runs should be recomputed. Ensure to use the same ts_version. Otherwise all time series get computed again. rerun_status: list[typing.Literal['Error', 'NoEvaluation']] Status of the runs that should be computed again. `Error` and/or `NoEvaluation`. """ title: str actuals_version: str covs_versions: list[str] = Field(default_factory=list) actuals_filter: dict[str, Any] = Field(default_factory=dict) covs_filter: dict[str, Any] = Field(default_factory=dict) max_ts_len: Annotated[ Optional[int], pydantic.Field(ge=1, le=1500)] = None lag_selection: LagSelectionConfig = LagSelectionConfig() evaluation_start_date: Optional[str] = None evaluation_end_date: Optional[str] = None max_publication_lag: int = 2 associator_report_id: Optional[pydantic.PositiveInt] = None use_clustering_results: bool = False post_selection_queries: list[str] = [] enable_leading_covariate_selection: bool = True fixed_season_length: Optional[int] = None pool_covs: Optional[list[PoolCovDefinition]] = None db_name: Optional[str] = None rerun_report_id: Optional[int] = None rerun_status: list[RerunStatus] = ['Error'] @model_validator(mode='after') def _validate_post_selection_queries(self) -> Self: # Validate the post-selection queries. invalid_queries = [] columns = { 'Lag': 'int', 'Rank': 'float', 'BetterThanNoCov': 'bool' } # Create an empty DataFrame with the specified column names and data types validation_df = pd.DataFrame(columns=columns.keys()).astype(columns) for postselection_query in self.post_selection_queries: try: validation_df.query(postselection_query, ) except Exception: invalid_queries.append(postselection_query) if len(invalid_queries): raise ValueError("The following post-selection queries are invalidly formatted: " f"{', '.join(invalid_queries)}. ") return self @model_validator(mode='after') def _validate_rerun_report_id(self) -> Self: if self.rerun_report_id is not None and self.pool_covs is not None: raise ValueError('rerun_report_id can not be used with pool_covs. ' 'Use the exact covs_version used in the rerun_report_id.') return selfConfiguration for a MATCHER run.
Parameters
title:builtins.str- A short description of the report.
actuals_version:builtins.str- The version ID of the actuals.
covs_versions:builtins.list[builtins.str]- List of versions of the covariates.
actuals_filter:builtins.dict[builtins.str, typing.Any]- Filter criterion for actuals time series. The given actuals version is automatically added as additional filter criterion. Possible Filter criteria are all fields that are part of the TimeSeries class. e.g. {'name': 'Sales'} For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
covs_filter:builtins.dict[builtins.str, typing.Any]- Filter criterion for covariates time series. The given covariate version is automatically added as additional filter criterion. Possible Filter criteria are all fields that are part of the TimeSeries class. e.g. {'name': 'Sales'} For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
max_ts_len:typing.Optional[builtins.int]- At most this number of most recent observations of the actuals time series is used. Check the variable MAX_TS_LEN_CONFIG for allowed configuration.
lag_selection:LagSelectionConfig- Configuration of covariate lag selection.
evaluation_start_date:typing.Optional[builtins.str]- Optional start date for the evaluation. The input should be in the ISO format with date and time, "YYYY-mm-DDTHH-MM-SS", e.g., "2024-01-01T16:40:00". Actuals and covariate observations prior to this start date are dropped.
evaluation_end_date:typing.Optional[builtins.str]- Optional end date for the evaluation. The input should be in the ISO format with date and time, "YYYY-mm-DDTHH-MM-SS", e.g., "2024-01-01T16:40:00". Actuals and covariate observations after this end date are dropped.
max_publication_lag:builtins.int- Maximal publication lag for the covariates. The publication lag of a covariate is the number of most recent observations (compared to the actuals) that are missing for the covariate. E.g., if the actuals (for monthly granularity) end in April 2023 but the covariate ends in February 2023, the covariate has a publication lag of 2.
associator_report_id:typing.Optional[builtins.int]- Optional report id of clustering results. If None, the database is searched for a fitting clustering.
The clustering results are used in the post-selection. If there are too many selected behind this is
that they all would give similar results in forecasting. Only used if
use_clustering_resultsis true. use_clustering_results:builtins.bool- If true clustering results are used.
post_selection_queries:builtins.list[builtins.str]-
List of queries that are executed on the ranking summary DataFrame. Only ranking entries that match the queries are kept. The query strings need to satisfy the pandas query syntax (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html). Here are the columns of the ranking summary DataFrame that you might want to filter on:
Column Name | Data Type | Description
Lag | Int64 | Lag of the covariate. Rank | float64 | Rank of the model. BetterThanNoCov | bool | Indicates whether the model is better than the non-cov model.
enable_leading_covariate_selection:builtins.bool- When True, all covariates after the lag is applied that do not have at least one more datapoint beyond the the time period covered by actuals are removed from the candidate covariates passed to covariate selection.
fixed_season_length:typing.Optional[builtins.int]- An optional parameter specifying the length of a season in the dataset.
pool_covs:typing.Optional[builtins.list[PoolCovDefinition]]- List of covariate definitions.
db_name:typing.Optional[builtins.str]- Only accessible for internal use. Name of the database to use for storing the results.
rerun_report_id:typing.Optional[builtins.int]- ReportId from which failed runs should be recomputed. Ensure to use the same ts_version. Otherwise all time series get computed again.
rerun_status:list[typing.Literal['Error', 'NoEvaluation']]- Status of the runs that should be computed again.
Errorand/orNoEvaluation.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var actuals_filter : dict[str, typing.Any]var actuals_version : strvar associator_report_id : int | Nonevar covs_filter : dict[str, typing.Any]var covs_versions : list[str]var db_name : str | Nonevar enable_leading_covariate_selection : boolvar evaluation_end_date : str | Nonevar evaluation_start_date : str | Nonevar fixed_season_length : int | Nonevar lag_selection : LagSelectionConfigvar max_publication_lag : intvar max_ts_len : int | Nonevar model_configvar pool_covs : list[PoolCovDefinition] | Nonevar post_selection_queries : list[str]var rerun_report_id : int | Nonevar rerun_status : list[typing.Literal['Error', 'NoEvaluation']]var title : strvar use_clustering_results : bool
class MethodSelectionConfig (**data: Any)-
Expand source code
class MethodSelectionConfig(BaseConfig): """Method selection configuration. Parameters ---------- number_iterations: futureexpert.shared_models.PositiveInt Number of backtesting iterations. At least 8 iterations are needed for empirical prediction intervals. shift_len: futureexpert.shared_models.PositiveInt Number of time points by which the test window is shifted between backtesting iterations. backtesting_strategy: typing.Literal['standard', 'equal_coverage'] Selects the methodology for backtesting. - 'standard': A standard rolling forecast. The evaluation window with fixed length is shifted at each step. This strategy is controlled by `number_iterations` and `shift_len`. - 'equal_coverage': A balanced strategy that guarantees every data point within the `equal_coverage_size` is forecasted the same number of times. This strategy has specific requirements: It uses a `shift_len` of 1 and the number of iterations is calculated automatically based on the `equal_coverage_size` and forecast horizon, ignoring the `number_iterations` parameter. equal_coverage_size: typing.Optional[futureexpert.shared_models.PositiveInt] Number of recent data points to test when `backtesting_strategy` `equal_coverage` is active. If None or chosen length is too long, it tries most common season length of a time series granularity instead. refit: builtins.bool If true, then models are refitted for each backtesting iteration. default_error_metric: typing.Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape'] Error metric applied to the backtesting error for non-sporadic time series. sporadic_error_metric: typing.Literal['pis', 'sapis', 'acr', 'mar', 'msr'] Error metric applied to the backtesting errors for sporadic time series. additional_accuracy_measures: list[typing.Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape', 'pis', 'sapis', 'acr', 'mar', 'msr']] Additional accuracy measures for solely reporting purposes. Does not affect internal evaluation or model ranking. step_weights: typing.Optional[builtins.dict[futureexpert.shared_models.PositiveInt, builtins.float]] Mapping from forecast steps to weights associated to forecast errors for that forecasting step. - Purpose: Applied only on error-metrics for non-sporadic time series. - Weights: Only positive weights are allowed. If a forecast step is not included in the dictionary, it will be assigned a weight of zero. - Forecast Horizon: The highest key in this dictionary defines the forecast horizon for backtesting, if `skip_empirical_prediction_intervals` is set to `True`. additional_cov_method: typing.Optional[typing.Literal['AdaBoost', 'AutoArima', 'CART', 'CatBoost', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'LightGBM', 'LinearRegression', 'MLP', 'RandomForest', 'SVM', 'XGBoost']] Define up to one additional method that uses the defined covariates for creating forecasts. Will not be calculated if deemed unfit by the preselection. If the parameter forecasting_methods: typing.Sequence[typing.Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']] is defined, the additional cov method must appear in that list, too. cov_combination: typing.Literal['single', 'joint'] Create a forecast model for each individual covariate (single) or a model using all covariates together (joint). forecasting_methods: typing.Sequence[typing.Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']] Define specific forecasting methods to be tested for generating forecasts. Specifying fewer methods can significantly reduce the runtime of forecast creation. If not specified, all available forecasting methods will be used by default. Given methods are automatically preselected based on time series characteristics of your data. If none of the given methods fits your data, a fallback set of forecasting methods will be used instead. phase_out_fc_methods: typing.Sequence[typing.Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']] List of methods that will be used to forecast phase-out time series. Phase-out detection must be enabled in preprocessing configuration to take effect. """ number_iterations: Annotated[ValidatedPositiveInt, pydantic.Field(ge=1, le=24)] = PositiveInt(12) shift_len: ValidatedPositiveInt = PositiveInt(1) refit: bool = False default_error_metric: Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape'] = 'mse' sporadic_error_metric: Literal['pis', 'sapis', 'acr', 'mar', 'msr'] = 'pis' additional_accuracy_measures: list[Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape', 'pis', 'sapis', 'acr', 'mar', 'msr']] = pydantic.Field(default_factory=list) step_weights: Optional[dict[ValidatedPositiveInt, pydantic.PositiveFloat]] = None additional_cov_method: Optional[AdditionalCovMethod] = None cov_combination: Literal['single', 'joint'] = 'single' forecasting_methods: Sequence[ForecastingMethods] = pydantic.Field(default_factory=list) phase_out_fc_methods: Sequence[ForecastingMethods] = pydantic.Field(default_factory=lambda: ['ZeroForecast']) backtesting_strategy: Literal['standard', 'equal_coverage'] = 'standard' equal_coverage_size: Optional[ValidatedPositiveInt] = None @pydantic.model_validator(mode="after") def shift_length_valid_when_equal_coverage_active(self) -> Self: if (self.shift_len != 1 and self.backtesting_strategy == 'equal_coverage'): raise ValueError('Equal-Coverage-Backtesting-Strategy only allows a shift length of 1.') return self @pydantic.model_validator(mode="after") def step_weights_not_empty(self) -> Self: if self.step_weights is not None and len(self.step_weights) == 0: raise ValueError('Empty dictionary for step_weights is not allowed.') return selfMethod selection configuration.
Parameters
number_iterations:PositiveInt- Number of backtesting iterations. At least 8 iterations are needed for empirical prediction intervals.
shift_len:PositiveInt- Number of time points by which the test window is shifted between backtesting iterations.
backtesting_strategy:typing.Literal['standard', 'equal_coverage']- Selects the methodology for backtesting.
- 'standard': A standard rolling forecast. The evaluation window with fixed length is shifted at each step.
This strategy is controlled by
number_iterationsandshift_len. - 'equal_coverage': A balanced strategy that guarantees every data point within theequal_coverage_sizeis forecasted the same number of times. This strategy has specific requirements: It uses ashift_lenof 1 and the number of iterations is calculated automatically based on theequal_coverage_sizeand forecast horizon, ignoring thenumber_iterationsparameter. equal_coverage_size:typing.Optional[PositiveInt]- Number of recent data points to test when
backtesting_strategyequal_coverageis active. If None or chosen length is too long, it tries most common season length of a time series granularity instead. refit:builtins.bool- If true, then models are refitted for each backtesting iteration.
default_error_metric:typing.Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape']- Error metric applied to the backtesting error for non-sporadic time series.
sporadic_error_metric:typing.Literal['pis', 'sapis', 'acr', 'mar', 'msr']- Error metric applied to the backtesting errors for sporadic time series.
additional_accuracy_measures:list[typing.Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape', 'pis', 'sapis', 'acr', 'mar', 'msr']]- Additional accuracy measures for solely reporting purposes. Does not affect internal evaluation or model ranking.
step_weights:typing.Optional[builtins.dict[PositiveInt, builtins.float]]- Mapping from forecast steps to weights associated to forecast errors for that forecasting step.
- Purpose: Applied only on error-metrics for non-sporadic time series.
- Weights: Only positive weights are allowed.
If a forecast step is not included in the dictionary, it will be assigned a weight of zero.
- Forecast Horizon: The highest key in this dictionary defines the forecast horizon
for backtesting, if
skip_empirical_prediction_intervalsis set toTrue. additional_cov_method:typing.Optional[typing.Literal['AdaBoost', 'AutoArima', 'CART', 'CatBoost', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'LightGBM', 'LinearRegression', 'MLP', 'RandomForest', 'SVM', 'XGBoost']]- Define up to one additional method that uses the defined covariates for creating forecasts. Will not be calculated if deemed unfit by the preselection. If the parameter forecasting_methods: typing.Sequence[typing.Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']] is defined, the additional cov method must appear in that list, too.
cov_combination:typing.Literal['single', 'joint']- Create a forecast model for each individual covariate (single) or a model using all covariates together (joint).
forecasting_methods:typing.Sequence[typing.Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']]- Define specific forecasting methods to be tested for generating forecasts. Specifying fewer methods can significantly reduce the runtime of forecast creation. If not specified, all available forecasting methods will be used by default. Given methods are automatically preselected based on time series characteristics of your data. If none of the given methods fits your data, a fallback set of forecasting methods will be used instead.
phase_out_fc_methods:typing.Sequence[typing.Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']]- List of methods that will be used to forecast phase-out time series. Phase-out detection must be enabled in preprocessing configuration to take effect.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var additional_accuracy_measures : list[typing.Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape', 'pis', 'sapis', 'acr', 'mar', 'msr']]var additional_cov_method : Literal['AdaBoost', 'AutoArima', 'CART', 'CatBoost', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'LightGBM', 'LinearRegression', 'MLP', 'RandomForest', 'SVM', 'XGBoost'] | Nonevar backtesting_strategy : Literal['standard', 'equal_coverage']var cov_combination : Literal['single', 'joint']var default_error_metric : Literal['me', 'mpe', 'mse', 'mae', 'mase', 'mape', 'smape']var equal_coverage_size : PositiveInt | Nonevar forecasting_methods : Sequence[Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']]var model_configvar number_iterations : PositiveIntvar phase_out_fc_methods : Sequence[Literal['AdaBoost', 'Aft4Sporadic', 'AutoArima', 'AutoEsCov', 'CART', 'CatBoost', 'Croston', 'ES', 'ExtraTrees', 'FoundationModel', 'Glmnet(l1_ratio=1.0)', 'MA(granularity)', 'InterpolID', 'LightGBM', 'LinearRegression', 'MedianAS', 'MedianPattern', 'MLP', 'MostCommonValue', 'MA(3)', 'Naive', 'RandomForest', 'MA(season lag)', 'SVM', 'TBATS', 'Theta', 'TSB', 'XGBoost', 'ZeroForecast']]var refit : boolvar shift_len : PositiveIntvar sporadic_error_metric : Literal['pis', 'sapis', 'acr', 'mar', 'msr']var step_weights : dict[PositiveInt, float] | None
Methods
def shift_length_valid_when_equal_coverage_active(self) ‑> Self-
Expand source code
@pydantic.model_validator(mode="after") def shift_length_valid_when_equal_coverage_active(self) -> Self: if (self.shift_len != 1 and self.backtesting_strategy == 'equal_coverage'): raise ValueError('Equal-Coverage-Backtesting-Strategy only allows a shift length of 1.') return self def step_weights_not_empty(self) ‑> Self-
Expand source code
@pydantic.model_validator(mode="after") def step_weights_not_empty(self) -> Self: if self.step_weights is not None and len(self.step_weights) == 0: raise ValueError('Empty dictionary for step_weights is not allowed.') return self
class PreprocessingConfig (**data: Any)-
Expand source code
class PreprocessingConfig(BaseConfig): """Preprocessing configuration. Parameters ---------- remove_leading_zeros: builtins.bool If true, then leading zeros are removed from the time series before forecasting. Is only applied if the time series has at least 5 values, including missing values. use_season_detection: builtins.bool If true, then the season length is determined from the data. seasonalities_to_test: typing.Optional[builtins.list[typing.Union[builtins.list[futureexpert.shared_models.PositiveInt], futureexpert.shared_models.PositiveInt]]] Season lengths to be tested. If not defined, a suitable set for the given granularity is used. Season lengths can only be tested, if the number of observations is at least three times as long as the biggest season length. Note that 1 must be in the list if the non-seasonal case should be considered, too. Allows a combination of single granularities or combinations of granularities. fixed_seasonalities: typing.Optional[builtins.list[futureexpert.shared_models.PositiveInt]] Season lengths used without checking. Allowed only if `use_season_detection` is false. detect_outliers: builtins.bool If true, then identifies outliers in the data. replace_outliers: builtins.bool If true, then identified outliers are replaced. detect_changepoints: builtins.bool If true, then change points such as level shifts are identified. detect_quantization: builtins.bool If true, a quantization algorithm is applied to the time series. Recognizes quantizations in the historic time series data and, if one has been detected, applies it to the forecasts. phase_out_method: typing.Literal['OFF', 'TRAILING_ZEROS', 'AUTO_FEW_OBS'] Choose which method will be used to detect Phase-Out in timeseries or turn it OFF. TRAILING_ZEROS method uses the number of trailing zeros to detect Phase-Out. AUTO_FEW_OBS method uses few-observation-changepoints at the end of the time series to detect Phase-Out. AUTO_FEW_OBS is only allowed if `detect_changepoints` is true. num_trailing_zeros_for_phase_out: futureexpert.shared_models.PositiveInt Number of trailing zeros in timeseries to detect Phase-Out with TRAILING_ZEROS method. recent_trend_num_observations: typing.Optional[futureexpert.shared_models.PositiveInt] Number of observations which are included in time span used for recent trend detection. recent_trend_num_seasons: typing.Optional[futureexpert.shared_models.PositiveInt] Number of seasons which are included in time span used for recent trend detection. If both recent_trend_num_seasons and recent_trend_num_observations are set, the longer time span is used. """ remove_leading_zeros: bool = False use_season_detection: bool = True # empty lists and None are treated the same in apollon seasonalities_to_test: Optional[list[Union[list[ValidatedPositiveInt], ValidatedPositiveInt]]] = None fixed_seasonalities: Optional[list[ValidatedPositiveInt]] = None detect_outliers: bool = False replace_outliers: bool = False detect_changepoints: bool = False detect_quantization: bool = False phase_out_method: Literal['OFF', 'TRAILING_ZEROS', 'AUTO_FEW_OBS'] = 'OFF' num_trailing_zeros_for_phase_out: ValidatedPositiveInt = PositiveInt(5) recent_trend_num_observations: Optional[ValidatedPositiveInt] = PositiveInt(6) recent_trend_num_seasons: Optional[ValidatedPositiveInt] = PositiveInt(2) @pydantic.model_validator(mode='after') def _has_no_fixed_seasonalities_if_uses_season_detection(self) -> Self: if self.use_season_detection and self.fixed_seasonalities: raise ValueError('If fixed seasonalities is enabled, then season detection must be off.') return self @pydantic.model_validator(mode='after') def _has_detect_changepoints_if_phase_out_method_is_auto_few_obs(self) -> Self: if not self.detect_changepoints and self.phase_out_method == 'AUTO_FEW_OBS': raise ValueError('If phase_out_method is set to AUTO_FEW_OBS, then detect_changepoints must be on.') return self @pydantic.model_validator(mode='after') def _has_no_recent_trend_num_observation_nor_num_seasons(self) -> Self: if not self.recent_trend_num_observations and not self.recent_trend_num_seasons: raise ValueError( 'Both recent_trend_num_observations and recent_trend_num_seasons cannot be None at the same time.') return selfPreprocessing configuration.
Parameters
remove_leading_zeros:builtins.bool- If true, then leading zeros are removed from the time series before forecasting. Is only applied if the time series has at least 5 values, including missing values.
use_season_detection:builtins.bool- If true, then the season length is determined from the data.
seasonalities_to_test:typing.Optional[builtins.list[typing.Union[builtins.list[PositiveInt], PositiveInt]]]- Season lengths to be tested. If not defined, a suitable set for the given granularity is used. Season lengths can only be tested, if the number of observations is at least three times as long as the biggest season length. Note that 1 must be in the list if the non-seasonal case should be considered, too. Allows a combination of single granularities or combinations of granularities.
fixed_seasonalities:typing.Optional[builtins.list[PositiveInt]]- Season lengths used without checking. Allowed only if
use_season_detectionis false. detect_outliers:builtins.bool- If true, then identifies outliers in the data.
replace_outliers:builtins.bool- If true, then identified outliers are replaced.
detect_changepoints:builtins.bool- If true, then change points such as level shifts are identified.
detect_quantization:builtins.bool- If true, a quantization algorithm is applied to the time series. Recognizes quantizations in the historic time series data and, if one has been detected, applies it to the forecasts.
phase_out_method:typing.Literal['OFF', 'TRAILING_ZEROS', 'AUTO_FEW_OBS']- Choose which method will be used to detect Phase-Out in timeseries or turn it OFF.
TRAILING_ZEROS method uses the number of trailing zeros to detect Phase-Out.
AUTO_FEW_OBS method uses few-observation-changepoints at the end of the time series to detect Phase-Out.
AUTO_FEW_OBS is only allowed if
detect_changepointsis true. num_trailing_zeros_for_phase_out:PositiveInt- Number of trailing zeros in timeseries to detect Phase-Out with TRAILING_ZEROS method.
recent_trend_num_observations:typing.Optional[PositiveInt]- Number of observations which are included in time span used for recent trend detection.
recent_trend_num_seasons:typing.Optional[PositiveInt]- Number of seasons which are included in time span used for recent trend detection. If both recent_trend_num_seasons and recent_trend_num_observations are set, the longer time span is used.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var detect_changepoints : boolvar detect_outliers : boolvar detect_quantization : boolvar fixed_seasonalities : list[PositiveInt] | Nonevar model_configvar num_trailing_zeros_for_phase_out : PositiveIntvar phase_out_method : Literal['OFF', 'TRAILING_ZEROS', 'AUTO_FEW_OBS']var recent_trend_num_observations : PositiveInt | Nonevar recent_trend_num_seasons : PositiveInt | Nonevar remove_leading_zeros : boolvar replace_outliers : boolvar seasonalities_to_test : list[list[PositiveInt] | PositiveInt] | Nonevar use_season_detection : bool
class ReconciliationConfig (**data: Any)-
Expand source code
class ReconciliationConfig(BaseConfig): """Configuration for hierarchical reconciliation process. Parameters ---------- method: futureexpert.forecast_consistency.ReconciliationMethod Primary reconciliation method to use fallback_methods: typing.list[futureexpert.forecast_consistency.ReconciliationMethod] List of fallback methods to try if primary method fails excluded_levels: typing.list[builtins.str] Set of hierarchy levels to exclude from reconciliation actuals_period_length: typing.Optional[builtins.int] Number of last datapoints from actuals to use for proportion calculation (None = all) forecast_period_length: typing.Optional[builtins.int] Number of datapoints from forecasts to use for proportion calculation (None = all) """ method: ReconciliationMethod = ReconciliationMethod.BOTTOM_UP fallback_methods: List[ReconciliationMethod] = Field(default_factory=list) excluded_levels: List[str] = Field(default_factory=list) actuals_period_length: Optional[int] = None forecast_period_length: Optional[int] = NoneConfiguration for hierarchical reconciliation process.
Parameters
method:ReconciliationMethod- Primary reconciliation method to use
fallback_methods:typing.list[ReconciliationMethod]- List of fallback methods to try if primary method fails
excluded_levels:typing.list[builtins.str]- Set of hierarchy levels to exclude from reconciliation
actuals_period_length:typing.Optional[builtins.int]- Number of last datapoints from actuals to use for proportion calculation (None = all)
forecast_period_length:typing.Optional[builtins.int]- Number of datapoints from forecasts to use for proportion calculation (None = all)
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var actuals_period_length : int | Nonevar excluded_levels : List[str]var fallback_methods : List[ReconciliationMethod]var forecast_period_length : int | Nonevar method : ReconciliationMethodvar model_config
class ReconciliationMethod (*args, **kwds)-
Expand source code
class ReconciliationMethod(str, Enum): """Reconciliation methods for hierarchical forecasting.""" BOTTOM_UP = "bottom_up" TOP_DOWN_PROPORTION_AVERAGES = "top_down_proportion_averages" TOP_DOWN_FORECAST_PROPORTION = "top_down_forecast_proportion" MIN_TRACE_WLS_STRUCT = "min_trace_wls_struct"Reconciliation methods for hierarchical forecasting.
Ancestors
- builtins.str
- enum.Enum
Class variables
var BOTTOM_UPvar MIN_TRACE_WLS_STRUCTvar TOP_DOWN_FORECAST_PROPORTIONvar TOP_DOWN_PROPORTION_AVERAGES
class ReportConfig (**data: Any)-
Expand source code
class ReportConfig(BaseConfig): """Forecast run configuration. Parameters ---------- matcher_report_id: typing.Optional[builtins.int] Report ID of the covariate matcher. covs_versions: builtins.list[builtins.str] List of versions of the covariates. covs_configuration: typing.Optional[builtins.list[futureexpert.matcher.ActualsCovsConfiguration]] Mapping from actuals and covariates. Use for custom covariate or adjusted matcher results. If the matcher results should be used without changes use `matcher_report_id` instead. title: builtins.str Title of the report. actuals_filter: builtins.dict[builtins.str, typing.Any] Filter criterion for actuals time series. The given actuals version is automatically added as additional filter criterion. Possible Filter criteria are all fields that are part of the TimeSeries class. e.g. {'name': 'Sales'} For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors max_ts_len: typing.Optional[builtins.int] At most this number of most recent observations is used. Check the variable MAX_TS_LEN_CONFIG for allowed configuration. preprocessing: futureexpert.forecast.PreprocessingConfig Preprocessing configuration. forecasting: futureexpert.forecast.ForecastingConfig Forecasting configuration. method_selection: typing.Optional[futureexpert.forecast.MethodSelectionConfig] Method selection configuration. If not supplied, then a granularity dependent default is used. pool_covs: typing.Optional[builtins.list[futureexpert.pool.PoolCovDefinition]] List of covariate definitions. rerun_report_id: typing.Optional[builtins.int] ReportId from which failed runs should be recomputed. Ensure to use the same ts_version. Otherwise all time series get computed again. rerun_status: list[typing.Literal['Error', 'NoEvaluation']] Status of the runs that should be computed again. `Error` and/or `NoEvaluation`. db_name: typing.Optional[builtins.str] Only accessible for internal use. Name of the database to use for storing the results. priority: typing.Optional[builtins.int] Only accessible for internal use. Higher value indicate higher priority. """ title: str forecasting: ForecastingConfig matcher_report_id: Optional[int] = None covs_versions: list[str] = Field(default_factory=list) covs_configuration: Optional[list[ActualsCovsConfiguration]] = None actuals_filter: dict[str, Any] = Field(default_factory=dict) max_ts_len: Optional[int] = None preprocessing: PreprocessingConfig = PreprocessingConfig() pool_covs: Optional[list[PoolCovDefinition]] = None method_selection: Optional[MethodSelectionConfig] = None rerun_report_id: Optional[int] = None rerun_status: list[RerunStatus] = ['Error'] db_name: Optional[str] = None priority: Annotated[Optional[int], pydantic.Field(ge=0, le=10)] = None @pydantic.model_validator(mode="after") def _correctness_of_cov_configurations(self) -> Self: if (self.matcher_report_id or self.covs_configuration) and ( len(self.covs_versions) == 0 and self.pool_covs is None): raise ValueError( 'If one of `matcher_report_id` and `covs_configuration` is set also `covs_versions` needs to be set.') if (self.matcher_report_id is None and self.covs_configuration is None) and ( self.covs_versions or self.pool_covs): raise ValueError( 'If `covs_versions` or `pool_covs` is set ' + 'either `matcher_report_id` or `covs_configuration` needs to be set.') if self.covs_configuration is not None and len(self.covs_configuration) == 0: raise ValueError('`covs_configuration` has length zero and therefore won`t have any effect. ' 'Please remove the parameter or set to None.') return self @pydantic.model_validator(mode="after") def _only_one_covariate_definition(self) -> Self: fields = [ 'matcher_report_id', 'pool_covs' ] set_fields = [field for field in fields if getattr(self, field) is not None] if len(set_fields) > 1: raise ValueError(f"Only one of {', '.join(fields)} can be set. Found: {', '.join(set_fields)}") return self @pydantic.model_validator(mode="after") def _backtesting_step_weights_refer_to_valid_forecast_steps(self) -> Self: if (self.method_selection and self.method_selection.step_weights and max(self.method_selection.step_weights.keys()) > self.forecasting.fc_horizon): raise ValueError('Step weights must not refer to forecast steps beyond the fc_horizon.') return self @pydantic.model_validator(mode="after") def _valid_covs_version(self) -> Self: for covs_version in self.covs_versions: if re.match('^[0-9a-f]{24}$', covs_version) is None: raise ValueError(f'Given covs_version "{covs_version}" is not a valid ObjectId.') return self @pydantic.model_validator(mode='after') def _has_valid_phase_out_detection_method_if_phase_out_fc_method_was_changed(self) -> Self: if ((self.method_selection and self.method_selection.phase_out_fc_methods != ['ZeroForecast']) and self.preprocessing.phase_out_method == 'OFF'): # A warning is logged instead of raising an error since this does not cause downstream issues. # The user is informed that their changes to phase_out_fc_methods have no effect # to clarify the relationship between these settings. logger.warning('Phase-out detection must be enabled in PreprocessingConfig' ' so changes in phase_out_fc_methods in MethodSelectionConfig take effect.') return self @pydantic.model_validator(mode='after') def _has_non_empty_phase_out_fc_method_if_phase_out_detection_is_on(self) -> Self: if (self.method_selection and not self.method_selection.phase_out_fc_methods and self.preprocessing.phase_out_method != 'OFF'): raise ValueError('Phase out forecasting method cannot be empty when phase out detection is enabled.') return selfForecast run configuration.
Parameters
matcher_report_id:typing.Optional[builtins.int]- Report ID of the covariate matcher.
covs_versions:builtins.list[builtins.str]- List of versions of the covariates.
covs_configuration:typing.Optional[builtins.list[ActualsCovsConfiguration]]- Mapping from actuals and covariates. Use for custom covariate or adjusted matcher results.
If the matcher results should be used without changes use
matcher_report_idinstead. title:builtins.str- Title of the report.
actuals_filter:builtins.dict[builtins.str, typing.Any]- Filter criterion for actuals time series. The given actuals version is automatically added as additional filter criterion. Possible Filter criteria are all fields that are part of the TimeSeries class. e.g. {'name': 'Sales'} For more complex filter check: https://www.mongodb.com/docs/manual/reference/operator/query/#query-selectors
max_ts_len:typing.Optional[builtins.int]- At most this number of most recent observations is used. Check the variable MAX_TS_LEN_CONFIG for allowed configuration.
preprocessing:PreprocessingConfig- Preprocessing configuration.
forecasting:ForecastingConfig- Forecasting configuration.
method_selection:typing.Optional[MethodSelectionConfig]- Method selection configuration. If not supplied, then a granularity dependent default is used.
pool_covs:typing.Optional[builtins.list[PoolCovDefinition]]- List of covariate definitions.
rerun_report_id:typing.Optional[builtins.int]- ReportId from which failed runs should be recomputed. Ensure to use the same ts_version. Otherwise all time series get computed again.
rerun_status:list[typing.Literal['Error', 'NoEvaluation']]- Status of the runs that should be computed again.
Errorand/orNoEvaluation. db_name:typing.Optional[builtins.str]- Only accessible for internal use. Name of the database to use for storing the results.
priority:typing.Optional[builtins.int]- Only accessible for internal use. Higher value indicate higher priority.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var actuals_filter : dict[str, typing.Any]var covs_configuration : list[ActualsCovsConfiguration] | Nonevar covs_versions : list[str]var db_name : str | Nonevar forecasting : ForecastingConfigvar matcher_report_id : int | Nonevar max_ts_len : int | Nonevar method_selection : MethodSelectionConfig | Nonevar model_configvar pool_covs : list[PoolCovDefinition] | Nonevar preprocessing : PreprocessingConfigvar priority : int | Nonevar rerun_report_id : int | Nonevar rerun_status : list[typing.Literal['Error', 'NoEvaluation']]var title : str
class TrendDetectionConfiguration (**data: Any)-
Expand source code
class TrendDetectionConfiguration(BaseConfig): """Configuration for trend detection. Parameters ---------- end_time: typing.Optional[datetime.datetime] End (inclusive) of the time span used for trend detection. max_number_of_obs: builtins.int Width of the time span used for trend detection; (leading and trailing) missing values are disregarded, that is, at most this number of observations are used for a given time series. number_of_nans_tolerated: builtins.int Leading and lagging missing values are dropped prior to running the trend detection; if this results in a loss of more than this number of observations lost, then the trend is considered undetermined. """ end_time: Optional[datetime] = None max_number_of_obs: int = Field(default=6, gt=0) number_of_nans_tolerated: int = 2Configuration for trend detection.
Parameters
end_time:typing.Optional[datetime.datetime]- End (inclusive) of the time span used for trend detection.
max_number_of_obs:builtins.int- Width of the time span used for trend detection; (leading and trailing) missing values are disregarded, that is, at most this number of observations are used for a given time series.
number_of_nans_tolerated:builtins.int- Leading and lagging missing values are dropped prior to running the trend detection; if this results in a loss of more than this number of observations lost, then the trend is considered undetermined.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var end_time : datetime.datetime | Nonevar max_number_of_obs : intvar model_configvar number_of_nans_tolerated : int
class TsCreationConfig (**data: Any)-
Expand source code
class TsCreationConfig(BaseConfig): """Configuration for the creation of time series. Parameters ---------- value_columns_to_save: builtins.list[builtins.str] Value columns that should be saved. time_granularity: typing.Literal['yearly', 'quarterly', 'monthly', 'weekly', 'daily', 'hourly', 'halfhourly'] Target granularity of the time series. description: typing.Optional[builtins.str] A short description of the time series. start_date: typing.Optional[builtins.str] Dates before this date are excluded. end_date: typing.Optional[builtins.str] Dates after this date are excluded. grouping_level: builtins.list[builtins.str] Names of group columns that should be used as the grouping level. save_hierarchy: builtins.bool If true, interpretes the given grouping levels as levels of a hierarchy and saves all hierachy levels. Otherwise, no hierarchy levels are implied and only the single level with the given grouping is saved. e.g. if grouping_level is ['A', 'B', 'C'] time series of grouping 'A', 'AB' and 'ABC' is saved. For later filtering use {'grouping.A': {'$exists': True}} filter: builtins.list[futureexpert.checkin.FilterSettings] Settings for including or excluding values during time series creation. new_variables: builtins.list[futureexpert.checkin.NewValue] New value column that is a combination of two other value columns. missing_value_handler: typing.Literal['keepNaN', 'setToZero'] Strategy how to handle missing values during time series creation. """ value_columns_to_save: list[str] time_granularity: Literal['yearly', 'quarterly', 'monthly', 'weekly', 'daily', 'hourly', 'halfhourly'] description: Optional[str] = None grouping_level: list[str] = [] start_date: Optional[str] = None end_date: Optional[str] = None save_hierarchy: bool = False filter: list[FilterSettings] = [] new_variables: list[NewValue] = [] missing_value_handler: Literal['keepNaN', 'setToZero'] = 'keepNaN'Configuration for the creation of time series.
Parameters
value_columns_to_save:builtins.list[builtins.str]- Value columns that should be saved.
time_granularity:typing.Literal['yearly', 'quarterly', 'monthly', 'weekly', 'daily', 'hourly', 'halfhourly']- Target granularity of the time series.
description:typing.Optional[builtins.str]- A short description of the time series.
start_date:typing.Optional[builtins.str]- Dates before this date are excluded.
end_date:typing.Optional[builtins.str]- Dates after this date are excluded.
grouping_level:builtins.list[builtins.str]- Names of group columns that should be used as the grouping level.
save_hierarchy:builtins.bool- If true, interpretes the given grouping levels as levels of a hierarchy and saves all hierachy levels. Otherwise, no hierarchy levels are implied and only the single level with the given grouping is saved. e.g. if grouping_level is ['A', 'B', 'C'] time series of grouping 'A', 'AB' and 'ABC' is saved. For later filtering use {'grouping.A': {'$exists': True}}
filter:builtins.list[FilterSettings]- Settings for including or excluding values during time series creation.
new_variables:builtins.list[NewValue]- New value column that is a combination of two other value columns.
missing_value_handler:typing.Literal['keepNaN', 'setToZero']- Strategy how to handle missing values during time series creation.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- BaseConfig
- pydantic.main.BaseModel
Class variables
var description : str | Nonevar end_date : str | Nonevar filter : list[FilterSettings]var grouping_level : list[str]var missing_value_handler : Literal['keepNaN', 'setToZero']var model_configvar new_variables : list[NewValue]var save_hierarchy : boolvar start_date : str | Nonevar time_granularity : Literal['yearly', 'quarterly', 'monthly', 'weekly', 'daily', 'hourly', 'halfhourly']var value_columns_to_save : list[str]