Module futureexpert.expert_client

Classes

class ExpertClient (refresh_token: Optional[str] = None,
access_token: Optional[str] = None,
group: Optional[str] = None,
environment: "Optional[Literal['production', 'staging', 'development']]" = None,
timeout: int = 300,
max_retries: int = 3)
Expand source code
class ExpertClient:
    """Client for the FutureEXPERT REST API.

    This client provides the same interface as futureexpert.ExpertClient but
    communicates with the expert-api REST API instead of directly with the backend.

    It can be used as a drop-in replacement for ExpertClient when you want to
    use the REST API instead of the Python SDK.
    """

    def __init__(
        self,
        refresh_token: Optional[str] = None,
        access_token: Optional[str] = None,
        group: Optional[str] = None,
        environment: Optional[Literal['production', 'staging', 'development']] = None,
        timeout: int = 300,
        max_retries: int = 3
    ) -> None:
        """Initialize the client from a token.

        If you want to login using username and password, consider using ExpertClient.from_user_password.

        Parameters
        ----------
        refresh_token
            Authentication refresh token for Bearer authentication.
            If not provided, uses environment variable FUTURE_REFRESH_TOKEN.

            You can retrieve a long-lived refresh token (offline token) in the user settings of the futureEXPERT Dashboard
            or using the Open ID Connect token endpoint of our identity provider.

            Example for calling the token endpoint with scope `offline_access`:
            curl -s -X POST "https://future-auth.prognostica.de/realms/future/protocol/openid-connect/token" \
                    -H "Content-Type: application/x-www-form-urlencoded" \
                    --data-urlencode "client_id=expert" \
                    --data-urlencode "grant_type=password" \
                    --data-urlencode "scope=openid offline_access" \
                    --data-urlencode "username=$FUTURE_USER" \
                    --data-urlencode "password=$FUTURE_PW" | jq -r .refresh_token
        access_token
            Authentication access token for Bearer authentication.

            If used instead of refresh_token, no automated token refresh is possible.
        group
            Optional group name for users in multiple groups.
            If not provided, uses environment variable FUTURE_GROUP.
        environment
            Optional environment (production, staging, development).
            If not provided, uses environment variable FUTURE_ENVIRONMENT.
        timeout
            Request timeout in seconds (default: 300)
        max_retries
            Maximum number of retries for failed requests (default: 3)
        """
        self.environment = cast(Literal['production', 'staging', 'development'],
                                environment or os.getenv('FUTURE_ENVIRONMENT') or 'production')
        self.api_url = os.getenv('EXPERT_API_URL', _EXPERT_API_URLS[self.environment]).rstrip('/')
        self.auth_client = FutureAuthClient(environment=self.environment)
        self.group = group or os.getenv('FUTURE_GROUP')

        refresh_token = refresh_token or os.getenv('FUTURE_REFRESH_TOKEN')
        if refresh_token:
            self._oauth_token = self.auth_client.refresh_token(refresh_token)
        else:
            if access_token:
                # Decode access_token token for token signature validation
                self.auth_client.decode_token(access_token)

                # A token without `refresh_token` is never tried to be refreshed in OAuth2Client.
                # A token without `expires_at` / `expires_in` is considered not expired by OAuth2Client.
                self._oauth_token = {'access_token': access_token}
            else:
                raise ValueError(
                    'A token must be provided via parameter `refresh_token` or `access_token` '
                    'or FUTURE_REFRESH_TOKEN environment variable.\nAlternatively, use `.from_user_password`.'
                )

        if not self.group:
            authorized_groups = self.auth_client.get_user_groups(self._oauth_token['access_token'])
            if len(authorized_groups) == 1:
                self.group = authorized_groups[0]
            else:
                raise ValueError(
                    f'You have access to multiple groups. Please select one of the following: {authorized_groups}')

        self.timeout = timeout
        self.max_retries = max_retries
        self.report_status_cache: LRUCache[str, ReportStatus] = LRUCache(maxsize=5)

        logger.info('Successfully logged in to futureEXPERT.')

    def _update_token(self, token: dict[str, Any], refresh_token: str = '', access_token: str = '') -> None:
        """Callback for authlib's OAuth2Client to store a refreshed token.

        The signature (token, refresh_token, access_token) is required by OAuth2Client's update_token interface.
        

    Parameters
    ----------
    token: builtins.dict[builtins.str, typing.Any]

    refresh_token: builtins.str

    access_token: builtins.str

    return: builtins.NoneType

    """
        self._oauth_token = token

    @property
    def oauth2_client(self) -> OAuth2Client:
        # Create httpx client with retry transport
        transport = httpx.HTTPTransport(retries=self.max_retries)

        return OAuth2Client(
            client_id=self.auth_client.auth_configuration.auth_client_id,
            token_endpoint=self.auth_client.openid_configuration.token_endpoint,
            token_endpoint_auth_method=self.auth_client.auth_configuration.token_endpoint_auth_method,
            token=self._oauth_token,
            update_token=self._update_token,
            leeway=30,
            base_url=self.api_url,
            timeout=self.timeout,
            transport=transport
        )

    def _request(
        self,
        method: str,
        path: str,
        params: Mapping[str, Any] = {},
        json_data: Optional[Dict[str, Any]] = None,
        files: Optional[Dict[str, Any]] = None,
        data: Optional[Dict[str, Any]] = None
    ) -> Any:
        """Make HTTP request to the API.

        Parameters
        ----------
        method: builtins.str
            HTTP method (GET, POST, etc.)
        path: builtins.str
            API endpoint path: builtins.str
        params: typing.Mapping[builtins.str, typing.Any]
            Query parameters
        json_data: typing.Optional[typing.dict[builtins.str, typing.Any]]
            JSON request body
        files: typing.Optional[typing.dict[builtins.str, typing.Any]]
            Files for multipart upload
        data: typing.Optional[typing.dict[builtins.str, typing.Any]]
            Form data for multipart upload

        Returns
        -------
        Response data (parsed JSON)

        Raises
        ------
        httpx.HTTPStatusError
            If the request fails
        return: typing.Any

    """
        try:
            params_with_group = {**params, 'group': self.group}
            with self.oauth2_client as client:
                response = client.request(
                    method=method,
                    url=path,
                    params=params_with_group,
                    json=json_data,
                    files=files,
                    data=data
                )
            response.raise_for_status()

            # Return parsed JSON or None for empty responses
            if response.content:
                return response.json()
            return None

        except httpx.HTTPStatusError as e:
            logger.error(
                f'API request {method} {path} failed with status code {e.response.status_code}: {e.response.text}')
            if e.response.status_code == 400:
                raise ValueError(e.response.text)
            if e.response.status_code == 500:
                raise RuntimeError(e.response.text)
            raise

    # ==================== Data Upload and Check-in ====================

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def upload_data(
        self,
        source: Union[pd.DataFrame, str],
        file_specification: Optional[FileSpecification] = None
    ) -> Any:
        """Upload raw data for further processing.

        Parameters
        ----------
        source: typing.Union[pandas.DataFrame, builtins.str]
            Path to a CSV file or a pandas DataFrame.
        file_specification: typing.Optional[futureexpert.checkin.FileSpecification]
            File specification for CSV parsing.

        Returns
        -------
        Upload feedback with user_input_id and file_uuid
        return: typing.Any

    """
        if isinstance(source, pd.DataFrame):
            # Convert DataFrame to JSON for upload
            data_json = source.to_dict(orient='records')
            form_data = {
                'data': json.dumps(data_json)
            }
            if file_specification:
                form_data['file_specification'] = json.dumps(file_specification.model_dump())

            return self._request('POST', '/api/v1/check-in/data', data=form_data)
        else:
            # Upload file
            with open(source, 'rb') as f:
                files = {'file': f}
                data = {}
                if file_specification:
                    data['file_specification'] = json.dumps(file_specification.model_dump())

                return self._request('POST', '/api/v1/check-in/data', files=files, data=data)

    def get_data(self) -> Any:
        """Get available raw data.

        Returns
        -------
        Meta information of the data already uploaded.
        

    Parameters
    ----------
    return: typing.Any

    """
        return self._request('GET', '/api/v1/check-in/data')

    @validate_call
    def check_data_definition(
        self,
        user_input_id: str,
        file_uuid: str,
        data_definition: DataDefinition,
        file_specification: FileSpecification = FileSpecification()
    ) -> Any:
        """Check data definition.

        Parameters
        ----------
        user_input_id: builtins.str
            UUID of the user input.
        file_uuid: builtins.str
            UUID of the file.
        data_definition: futureexpert.checkin.DataDefinition
            Data definition specification.
        file_specification: futureexpert.checkin.FileSpecification
            File specification for CSV parsing.

        Returns
        -------
        Validation result
        return: typing.Any

    """
        logger.info('Started data definition using CHECK-IN...')
        payload = {
            'user_input_id': user_input_id,
            'file_uuid': file_uuid,
            'data_definition': data_definition.model_dump(),
            'file_specification': file_specification.model_dump()
        }
        result = self._request('POST', '/api/v1/check-in/validate', json_data=payload)
        logger.info('Finished data definition.')
        return result

    @validate_call
    def create_time_series(
        self,
        user_input_id: str,
        file_uuid: str,
        data_definition: Optional[DataDefinition] = None,
        config_ts_creation: Optional[TsCreationConfig] = None,
        config_checkin: Optional[str] = None,
        file_specification: FileSpecification = FileSpecification()
    ) -> Any:
        """Create time series from already uploaded data.

        This is the second step of the check-in process, after upload_data.

        Parameters
        ----------
        user_input_id: builtins.str
            UUID of the user input (from upload_data response).
        file_uuid: builtins.str
            UUID of the file (from upload_data response).
        data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
            Data definition specification.
        config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
            Time series creation configuration.
        config_checkin: typing.Optional[builtins.str]
            Path to JSON config file (alternative to data_definition + config_ts_creation).
        file_specification: futureexpert.checkin.FileSpecification
            File specification for CSV parsing.

        Returns
        -------
        Time series creation result with version information
        return: typing.Any

    """
        logger.info('Creating time series using CHECK-IN...')
        form_data: Dict[str, Any] = {
            'user_input_id': user_input_id,
            'file_uuid': file_uuid,
        }

        if data_definition:
            form_data['data_definition'] = json.dumps(data_definition.model_dump())
        if config_ts_creation:
            form_data['config_ts_creation'] = json.dumps(config_ts_creation.model_dump())
        if file_specification:
            form_data['file_specification'] = json.dumps(file_specification.model_dump())

        files: Dict[str, Any] = {}
        if config_checkin:
            files['config_checkin'] = open(config_checkin, 'rb')

        try:
            result = self._request('POST', '/api/v1/check-in/create', files=files or None, data=form_data)
            logger.info('Finished time series creation.')
            return result
        finally:
            for f in files.values():
                f.close()

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def check_in_time_series(
        self,
        raw_data_source: Union[pd.DataFrame, Path, str],
        data_definition: Optional[DataDefinition] = None,
        config_ts_creation: Optional[TsCreationConfig] = None,
        config_checkin: Optional[str] = None,
        file_specification: FileSpecification = FileSpecification()
    ) -> str:
        """Check in time series data.

        Only available in `Standard`, `Premium` and `Enterprise` subscription packages.

        Parameters
        ----------
        raw_data_source: typing.Union[pandas.DataFrame, pathlib.Path, builtins.str]
            DataFrame with raw data or path to CSV file.
        data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
            Data definition specification.
        config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
            Time series creation configuration.
        config_checkin: typing.Optional[builtins.str]
            Path to JSON config file (alternative to data_definition + config_ts_creation).
        file_specification: futureexpert.checkin.FileSpecification
            File specification for CSV parsing.

        Returns
        -------
        Version ID of the created time series
        return: builtins.str

    """
        form_data: Dict[str, Any] = {}
        files: Dict[str, Any] = {}

        if data_definition:
            form_data['data_definition'] = json.dumps(data_definition.model_dump())
        if config_ts_creation:
            form_data['config_ts_creation'] = json.dumps(config_ts_creation.model_dump())
        if config_checkin:
            files['config_checkin'] = open(config_checkin, 'rb')
        if file_specification:
            form_data['file_specification'] = json.dumps(file_specification.model_dump())

        try:

            if isinstance(raw_data_source, pd.DataFrame):

                with tempfile.TemporaryDirectory() as tmpdir:
                    time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S')
                    file_path = os.path.join(tmpdir, f'expert-{time_stamp}.csv')
                    date_format = data_definition.date_column.format if data_definition else None
                    raw_data_source.to_csv(path_or_buf=file_path, index=False, sep=file_specification.delimiter,
                                           decimal=file_specification.decimal, encoding='utf-8-sig',
                                           date_format=date_format)
                    files['file'] = open(file_path, 'rb')
                    result = self._request('POST', '/api/v1/check-in', files=files or None, data=form_data)

            else:
                files['file'] = open(raw_data_source, 'rb')
                result = self._request('POST', '/api/v1/check-in', files=files or None, data=form_data)
            return str(result['version_id'])
        finally:
            for f in files.values():
                f.close()

    @validate_call
    def check_in_pool_covs(
        self,
        requested_pool_covs: List[PoolCovDefinition],
        description: Optional[str] = None
    ) -> CheckInPoolResult:
        """Create a new version from pool covariates.

        Parameters
        ----------
        requested_pool_covs: typing.list[futureexpert.pool.PoolCovDefinition]
            List of pool covariate definitions.
        description: typing.Optional[builtins.str]
            Short description of the selected covariates.

        Returns
        -------
        CheckInPoolResult with version_id and metadata
        return: futureexpert.pool.CheckInPoolResult

    """
        logger.info('Creating time series using checkin-pool...')
        payload = {
            'requested_pool_covs': [cov.model_dump() for cov in requested_pool_covs],
            'description': description
        }
        result = self._request('POST', '/api/v1/check-in/pool-covariate', json_data=payload)
        logger.info('Finished time series creation.')
        return CheckInPoolResult(**result)

    # ==================== Time Series ====================

    @validate_call
    def get_time_series(self, version_id: str) -> CheckInResult:
        """Get time series data by version ID.

        Parameters
        ----------
        version_id: builtins.str
            Time series version ID.

        Returns
        -------
        CheckInResult with time series data
        return: futureexpert.checkin.CheckInResult

    """
        result = self._request('GET', f'/api/v1/ts/{version_id}')
        return CheckInResult(**result)

    @validate_call
    def get_ts_versions(self, skip: int = 0, limit: int = 100) -> PydanticModelList[TimeSeriesVersion]:
        """Get list of time series versions.

        Parameters
        ----------
        skip: builtins.int
            Number of items to skip.
        limit: builtins.int
            Maximum number of items to return.

        Returns
        -------
        DataFrame with time series versions
        return: futureexpert.shared_models.PydanticModelList[futureexpert.checkin.TimeSeriesVersion]

    """
        params = {'skip': skip, 'limit': limit}
        results = self._request('GET', '/api/v1/ts', params=params)
        return PydanticModelList([TimeSeriesVersion.model_validate(raw_result) for raw_result in results])

    # ==================== Pool Covariates ====================

    @validate_call
    def get_pool_cov_overview(
        self,
        granularity: Optional[str] = None,
        search: Optional[str] = None
    ) -> PoolCovOverview:
        """Get overview of available pool covariates.

        Parameters
        ----------
        granularity: typing.Optional[builtins.str]
            Filter by granularity (Day or Month).
        search: typing.Optional[builtins.str]
            Full-text search query.

        Returns
        -------
        PoolCovOverview with available covariates
        return: futureexpert.pool.PoolCovOverview

    """
        params = {}
        if granularity:
            params['granularity'] = granularity
        if search:
            params['search'] = search

        result = self._request('GET', '/api/v1/pool', params=params)
        return PoolCovOverview(overview_json=result['overview_json'])

    # ==================== Forecasting ====================

    @validate_call
    def start_forecast(
        self,
        version: str,
        config: ReportConfig,
        reconciliation_config: Optional[ReconciliationConfig] = None
    ) -> Union[ReportIdentifier, ChainedReportIdentifier]:
        """Start a forecasting report.

        Parameters
        ----------
        version: builtins.str
            Time series version ID.
        config: futureexpert.forecast.ReportConfig
            Forecast configuration.
        reconciliation_config: futureexpert.forecast.ReportConfig
            Configuration to make forecasts consistent over hierarchical levels.

        Returns
        -------
        ReportIdentifier with report_id and settings_id.
        If reconciliation_config is provided, returns ChainedReportIdentifier
        with prerequisites containing the forecast report identifier.
        reconciliation_config: typing.Optional[futureexpert.forecast_consistency.ReconciliationConfig]

    return: typing.Union[futureexpert.shared_models.ReportIdentifier, futureexpert.shared_models.ChainedReportIdentifier]

    """
        payload: Dict[str, Any] = {
            'version': version,
            'config': config.model_dump()
        }

        if reconciliation_config is not None:
            payload['reconciliation_config'] = reconciliation_config.model_dump()

        logger.info('Started creating FORECAST...')
        result = self._request('POST', '/api/v1/forecast', json_data=payload)

        identifier_model = ChainedReportIdentifier if 'prerequisites' in result else ReportIdentifier
        report_identifier = identifier_model.model_validate(result)
        logger.info(f'Report created with ID {report_identifier.report_id}. Forecasts are running...')
        return report_identifier

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def start_forecast_from_raw_data(self,
                                     raw_data_source: Union[pd.DataFrame, Path, str],
                                     config_fc: ReportConfig,
                                     data_definition: Optional[DataDefinition] = None,
                                     config_ts_creation: Optional[TsCreationConfig] = None,
                                     config_checkin: Optional[str] = None,
                                     file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier:
        """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.

        Parameters
        ----------
        raw_data_source: typing.Union[pandas.DataFrame, pathlib.Path, builtins.str]
            A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
        config_fc: futureexpert.forecast.ReportConfig
            The configuration of the forecast run.
        data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
            Specifies the data, value and group columns and which rows and columns should be removed.
        config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
            Defines filter and aggreagtion level of the time series.
        config_checkin: typing.Optional[builtins.str]
            Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
            cannot be set simultaneously. The configuration may be obtained from the last step of
            CHECK-IN using the future frontend (now.future-forecasting.de).
        file_specification: futureexpert.checkin.FileSpecification
            Needed if a CSV is used with e.g. German format.

        Returns
        -------
        The identifier of the forecasting report.
        return: futureexpert.shared_models.ReportIdentifier

    """

        assert config_fc.rerun_report_id is None, 'start_forecast_from_raw_data can not be used with rerun_report_id.'

        upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification)

        user_input_id = upload_feedback['uuid']
        file_id = upload_feedback['files'][0]['uuid']

        res2 = self.create_time_series(user_input_id=user_input_id,
                                       file_uuid=file_id,
                                       data_definition=data_definition,
                                       config_ts_creation=config_ts_creation,
                                       config_checkin=config_checkin,
                                       file_specification=file_specification)

        version = res2['result']['tsVersion']
        return self.start_forecast(version=version, config=config_fc)

    @validate_call
    def get_fc_results(
        self,
        id: Union[ReportIdentifier, int],
        include_k_best_models: int = 1,
        include_backtesting: bool = False,
        include_discarded_models: bool = False
    ) -> ForecastResults:
        """Get forecast results.

        Parameters
        ----------
        id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
            Report identifier or report ID.
        include_k_best_models: builtins.int
            Number of best models to include.
        include_backtesting: builtins.bool
            Include backtesting results.
        include_discarded_models: builtins.bool
            Include discarded models.

        Returns
        -------
        ForecastResults with forecast data
        return: futureexpert.forecast.ForecastResults

    """
        report_id = id.report_id if isinstance(id, ReportIdentifier) else id
        report_status = self.get_report_status(id=id)
        has_results = self._can_load_results(report_status)

        if not has_results:
            return ForecastResults(forecast_results=[])

        params = {
            'include_k_best_models': include_k_best_models,
            'include_backtesting': include_backtesting,
            'include_discarded_models': include_discarded_models
        }
        result = self._request('GET', f'/api/v1/forecast/{report_id}/results', params=params)

        # Parse results
        forecast_results = [ForecastResult.model_validate(r) for r in result['forecast_results']]
        fc_results = ForecastResults(forecast_results=forecast_results)

        if result.get('consistency') is not None:
            fc_results.consistency = ConsistentForecastMetadata.model_validate(result['consistency'])

        return fc_results

    # ==================== Matcher ====================

    @validate_call
    def start_matcher(self, config: MatcherConfig) -> ReportIdentifier:
        """Start a covariate matcher report.

        Parameters
        ----------
        config: futureexpert.matcher.MatcherConfig
            Matcher configuration.

        Returns
        -------
        ReportIdentifier with report_id and settings_id
        return: futureexpert.shared_models.ReportIdentifier

    """
        payload = {'config': config.model_dump()}
        result = self._request('POST', '/api/v1/matcher', json_data=payload)
        report = ReportIdentifier.model_validate(result)
        logger.info(f'Report created with ID {report.report_id}. Matching indicators...')
        return report

    @validate_call
    def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> List[MatcherResult]:
        """Get matcher results.

        Parameters
        ----------
        id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
            Report identifier or report ID.

        Returns
        -------
        List of MatcherResult objects
        return: typing.list[futureexpert.matcher.MatcherResult]

    """
        report_id = id.report_id if isinstance(id, ReportIdentifier) else id

        report_status = self.get_report_status(id=id)
        has_results = self._can_load_results(report_status)

        if not has_results:
            return []

        result = self._request('GET', f'/api/v1/matcher/{report_id}/results')
        return [MatcherResult(**r) for r in result]

    # ==================== Associator ====================

    @validate_call
    def start_associator(self, config: AssociatorConfig) -> ReportIdentifier:
        """Start an associator report.

        Parameters
        ----------
        config: futureexpert.associator.AssociatorConfig
            Associator configuration.

        Returns
        -------
        ReportIdentifier with report_id and settings_id
        return: futureexpert.shared_models.ReportIdentifier

    """
        payload = {'config': config.model_dump()}
        result = self._request('POST', '/api/v1/associator', json_data=payload)
        report = ReportIdentifier.model_validate(result)
        logger.info(f'Report created with ID {report.report_id}. Associator is running...')
        return report

    @validate_call
    def get_associator_results(self, id: Union[ReportIdentifier, int]) -> Optional[AssociatorResult]:
        """Get associator results.

        Parameters
        ----------
        id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
            Report identifier or report ID.

        Returns
        -------
        Results of the ASSOCIATOR report.
        return: typing.Optional[futureexpert.associator.AssociatorResult]

    """
        report_id = id.report_id if isinstance(id, ReportIdentifier) else id

        report_status = self.get_report_status(id=id)
        has_results = self._can_load_results(report_status)

        if not has_results:
            return None

        result = self._request('GET', f'/api/v1/associator/{report_id}/results')
        return AssociatorResult(**result)

    # ==================== Reports ====================

    @validate_call
    def get_reports(self, skip: int = 0, limit: int = 100) -> PydanticModelList[ReportSummary]:
        """Get list of available reports.

        Parameters
        ----------
        skip: builtins.int
            Number of items to skip.
        limit: builtins.int
            Maximum number of items to return.

        Returns
        -------
        The available reports from newest to oldest.
        return: futureexpert.shared_models.PydanticModelList[futureexpert.shared_models.ReportSummary]

    """
        params = {'skip': skip, 'limit': limit}
        result = self._request('GET', '/api/v1/report', params=params)
        return PydanticModelList([ReportSummary.model_validate(report) for report in result])

    @validate_call
    def _get_single_report_status(self, report_identifier: ReportIdentifier, include_error_reason: bool = True) -> ReportStatus:
        """Gets the current status of a single report.

        Parameters
        ----------
        id
            Report identifier.
        include_error_reason: builtins.bool
            Determines whether log messages are to be included in the result.

        Returns
        -------
        The status of the report.
        report_identifier: futureexpert.shared_models.ReportIdentifier

    return: futureexpert.shared_models.ReportStatus

    """
        report_id = report_identifier.report_id

        cache_key = f'{report_id}_{include_error_reason}'
        if cache_key in self.report_status_cache:
            return self.report_status_cache[cache_key]

        # Determine endpoint based on report type
        report_type = self.get_report_type(report_identifier=report_id)

        # Use specific endpoint based on type
        params = {'include_error_reason': include_error_reason}
        if report_type in ['forecast', 'MongoForecastingResultSink', 'hierarchical-forecast']:
            raw_result = self._request('GET', f'/api/v1/forecast/{report_id}/status', params=params)
        elif report_type in ['matcher', 'CovariateSelection']:
            raw_result = self._request('GET', f'/api/v1/matcher/{report_id}/status', params=params)
        elif report_type == 'associator':
            raw_result = self._request('GET', f'/api/v1/associator/{report_id}/status', params=params)
        elif report_type == 'shaper':
            raw_result = self._request('GET', f'/api/v1/shaper/{report_id}/status', params=params)
        else:
            raise RuntimeError(f'Unsupported report type {report_type}')

        result = ReportStatus(**raw_result)
        if result.progress.requested == result.progress.finished and result.is_finished:
            self.report_status_cache[cache_key] = result

        return result

    @validate_call
    def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus:
        """Gets the current status of a report.

        If the provided report identifier includes prerequisites, the status of the prerequisites is included, too.

        Parameters
        ----------
        id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
            Report identifier or plain report ID.
        include_error_reason: builtins.bool
            Determines whether log messages are to be included in the result.

        Returns
        -------
        The status of the report.
        return: futureexpert.shared_models.ReportStatus

    """
        identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None)

        final_status = self._get_single_report_status(
            report_identifier=identifier, include_error_reason=include_error_reason)
        if isinstance(identifier, ChainedReportIdentifier):
            for prerequisite_identifier in identifier.prerequisites:
                prerequisite_status = self.get_report_status(id=prerequisite_identifier,
                                                             include_error_reason=include_error_reason)
                final_status.prerequisites.append(prerequisite_status)
        return final_status

    def _can_load_results(self, report_status: ReportStatus) -> bool:
        """Checks if results of an report can be returned and create log messages.

    Parameters
    ----------
    report_status: futureexpert.shared_models.ReportStatus

    return: builtins.bool

    """

        if report_status.progress.finished == 0:
            logger.warning('The report is not finished. No results to return.')
            return False

        if report_status.progress.finished != report_status.progress.requested:
            logger.warning('The report is not finished.')

        if report_status.result_type == 'matcher':
            if report_status.progress.finished < report_status.progress.requested and report_status.results.successful > 0:
                logger.warning('The report is not finished. Returning incomplete results.')
                return True
            if report_status.results.successful == 0:
                logger.warning('No results to return. Check `get_report_status` for details.')
                return False

        if report_status.result_type != 'matcher':
            if report_status.progress.finished < report_status.progress.requested \
                    and (report_status.results.successful > 0 or report_status.results.no_evaluation > 0):
                logger.warning('The report is not finished. Returning incomplete results.')
                return True
            if report_status.results.successful == 0 and report_status.results.no_evaluation == 0:
                logger.warning(
                    'Zero runs were successful. No results can be returned. Check `get_report_status` for details.')
                return False

        return True

    @validate_call
    def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) -> str:
        """Get report type.

        Parameters
        ----------
        report_identifier: typing.Union[builtins.int, futureexpert.shared_models.ReportIdentifier]
            Report ID or identifier.

        Returns
        -------
        Report type string
        return: builtins.str

    """
        report_id = report_identifier.report_id if isinstance(
            report_identifier, ReportIdentifier
        ) else report_identifier

        result = self._request('GET', f'/api/v1/report/{report_id}')
        return str(result['type'])

    @validate_call
    def start_making_forecast_consistent(
        self,
        config: MakeForecastConsistentConfiguration
    ) -> ReportIdentifier:
        """Start hierarchical forecast reconciliation process.

        Makes forecasts consistent across hierarchical levels.

        Parameters
        ----------
        config: futureexpert.forecast_consistency.MakeForecastConsistentConfiguration
            Configuration for the reconciliation process.

        Returns
        -------
        ReportIdentifier with report_id and settings_id
        return: futureexpert.shared_models.ReportIdentifier

    """
        payload: Dict[str, Any] = {
            'data_selection': config.data_selection.model_dump(),
            'report_note': config.report_note
        }

        if config.db_name:
            payload['db_name'] = config.db_name
        if config.reconciliation:
            payload['reconciliation'] = config.reconciliation.model_dump()

        logger.info('Started creating hierarchical reconciliation for consistent forecasts...')
        result = self._request('POST', '/api/v1/forecast/reconcile', json_data=payload)
        report = ReportIdentifier.model_validate(result)
        logger.info(f'Report created with ID {report.report_id}. Reconciliation is running...')
        return report

    @validate_call
    def create_scenario_values(self,
                               config: ScenarioValuesConfig) -> ShaperConfig:
        """Creates scenario values for covariates based on a time series and forecast horizon.

        Parameters
        ----------
        config: futureexpert.shaper.ScenarioValuesConfig
            Configuration for the creation of scenario values.

        Returns
        -------
        A list of Scenario objects containing high and low projections for each covariate.
        return: futureexpert.shaper.ShaperConfig

    """

        payload = {'config': config.model_dump(mode='json')}
        result = self._request('POST', '/api/v1/shaper/prepare', json_data=payload)
        return ShaperConfig(**result)

    @validate_call
    def start_scenario_forecast(self, config: ShaperConfig) -> ReportIdentifier:
        """Start forecast for scenarios.

        Parameters
        ----------
        config: futureexpert.shaper.ShaperConfig
            Configuration for a SHAPER run.
        return: futureexpert.shared_models.ReportIdentifier

    """
        ref_config = copy.deepcopy(config)
        for scenario in ref_config.scenarios:
            if isinstance(scenario.ts, Covariate):
                scenario.ts = CovariateRef(name=scenario.ts.ts.name, lag=scenario.ts.lag)

        payload = {'config': ref_config.model_dump(mode='json')}

        result = self._request('POST', '/api/v1/shaper', json_data=payload)
        report = ReportIdentifier.model_validate(result)
        logger.info(f'Report created with ID {report.report_id}. Shaping scenarios...')
        return report

    @staticmethod
    def from_user_password(dotenv_path: Optional[str] = None) -> ExpertClient:
        """Initialize ExpertClient from FUTURE_USER and FUTURE_PW in .env file or environment variables.

    Parameters
    ----------
    dotenv_path: typing.Optional[builtins.str]

    return: futureexpert.expert_client.ExpertClient

    """
        load_dotenv(dotenv_path=dotenv_path)
        environment = cast(Literal['production', 'staging', 'development'], os.getenv('FUTURE_ENVIRONMENT'))
        try:
            future_user = os.environ['FUTURE_USER']
        except KeyError:
            raise MissingCredentialsError('username') from None
        try:
            future_password = os.environ['FUTURE_PW']
        except KeyError:
            raise MissingCredentialsError('password') from None
        auth_client = FutureAuthClient(environment=environment)
        token = auth_client.token(future_user, future_password)
        return ExpertClient(refresh_token=token['refresh_token'], environment=environment)

    @validate_call
    def get_shaper_results(self, id: Union[ReportIdentifier, int]) -> Optional[ShaperResult]:
        """Gets the results from the given report.

        Parameters
        ----------
        id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
            Report identifier or plain report ID.

        Returns
        -------
        Results of the SHAPER report.
        return: typing.Optional[futureexpert.shaper.ShaperResult]

    """
        report_id = id.report_id if isinstance(id, ReportIdentifier) else id

        report_status = self.get_report_status(id=id)
        has_results = self._can_load_results(report_status)

        if not has_results:
            return None

        result = self._request('GET', f'/api/v1/shaper/{report_id}/results')
        return ShaperResult(**result)

    def logout(self) -> None:
        """Logout from futureEXPERT.

        If logged in with a refresh token. The refresh token is revoked.
        

    Parameters
    ----------
    return: builtins.NoneType

    """
        if (refresh_token := self._oauth_token.get('refresh_token')) is None:
            raise RuntimeError('Cannot logout without refresh_token')
        self.auth_client.logout(refresh_token)
        logger.info('Successfully logged out.')

Client for the FutureEXPERT REST API.

This client provides the same interface as futureexpert.ExpertClient but communicates with the expert-api REST API instead of directly with the backend.

It can be used as a drop-in replacement for ExpertClient when you want to use the REST API instead of the Python SDK.

Initialize the client from a token.

If you want to login using username and password, consider using ExpertClient.from_user_password.

Parameters

refresh_token

Authentication refresh token for Bearer authentication. If not provided, uses environment variable FUTURE_REFRESH_TOKEN.

You can retrieve a long-lived refresh token (offline token) in the user settings of the futureEXPERT Dashboard or using the Open ID Connect token endpoint of our identity provider.

Example for calling the token endpoint with scope offline_access: curl -s -X POST "https://future-auth.prognostica.de/realms/future/protocol/openid-connect/token" -H "Content-Type: application/x-www-form-urlencoded" –data-urlencode "client_id=expert" –data-urlencode "grant_type=password" –data-urlencode "scope=openid offline_access" –data-urlencode "username=$FUTURE_USER" –data-urlencode "password=$FUTURE_PW" | jq -r .refresh_token

access_token

Authentication access token for Bearer authentication.

If used instead of refresh_token, no automated token refresh is possible.

group
Optional group name for users in multiple groups. If not provided, uses environment variable FUTURE_GROUP.
environment
Optional environment (production, staging, development). If not provided, uses environment variable FUTURE_ENVIRONMENT.
timeout
Request timeout in seconds (default: 300)
max_retries
Maximum number of retries for failed requests (default: 3)

Static methods

def from_user_password(dotenv_path: Optional[str] = None) ‑> ExpertClient
Expand source code
@staticmethod
def from_user_password(dotenv_path: Optional[str] = None) -> ExpertClient:
    """Initialize ExpertClient from FUTURE_USER and FUTURE_PW in .env file or environment variables.

Parameters
----------
dotenv_path: typing.Optional[builtins.str]

return: futureexpert.expert_client.ExpertClient

"""
    load_dotenv(dotenv_path=dotenv_path)
    environment = cast(Literal['production', 'staging', 'development'], os.getenv('FUTURE_ENVIRONMENT'))
    try:
        future_user = os.environ['FUTURE_USER']
    except KeyError:
        raise MissingCredentialsError('username') from None
    try:
        future_password = os.environ['FUTURE_PW']
    except KeyError:
        raise MissingCredentialsError('password') from None
    auth_client = FutureAuthClient(environment=environment)
    token = auth_client.token(future_user, future_password)
    return ExpertClient(refresh_token=token['refresh_token'], environment=environment)

Initialize ExpertClient from FUTURE_USER and FUTURE_PW in .env file or environment variables.

Parameters

dotenv_path : typing.Optional[builtins.str]
 
return : ExpertClient
 

Instance variables

prop oauth2_client : OAuth2Client
Expand source code
@property
def oauth2_client(self) -> OAuth2Client:
    # Create httpx client with retry transport
    transport = httpx.HTTPTransport(retries=self.max_retries)

    return OAuth2Client(
        client_id=self.auth_client.auth_configuration.auth_client_id,
        token_endpoint=self.auth_client.openid_configuration.token_endpoint,
        token_endpoint_auth_method=self.auth_client.auth_configuration.token_endpoint_auth_method,
        token=self._oauth_token,
        update_token=self._update_token,
        leeway=30,
        base_url=self.api_url,
        timeout=self.timeout,
        transport=transport
    )

Methods

def check_data_definition(self,
user_input_id: str,
file_uuid: str,
data_definition: DataDefinition,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any
Expand source code
@validate_call
def check_data_definition(
    self,
    user_input_id: str,
    file_uuid: str,
    data_definition: DataDefinition,
    file_specification: FileSpecification = FileSpecification()
) -> Any:
    """Check data definition.

    Parameters
    ----------
    user_input_id: builtins.str
        UUID of the user input.
    file_uuid: builtins.str
        UUID of the file.
    data_definition: futureexpert.checkin.DataDefinition
        Data definition specification.
    file_specification: futureexpert.checkin.FileSpecification
        File specification for CSV parsing.

    Returns
    -------
    Validation result
    return: typing.Any

"""
    logger.info('Started data definition using CHECK-IN...')
    payload = {
        'user_input_id': user_input_id,
        'file_uuid': file_uuid,
        'data_definition': data_definition.model_dump(),
        'file_specification': file_specification.model_dump()
    }
    result = self._request('POST', '/api/v1/check-in/validate', json_data=payload)
    logger.info('Finished data definition.')
    return result

Check data definition.

Parameters

user_input_id : builtins.str
UUID of the user input.
file_uuid : builtins.str
UUID of the file.
data_definition : DataDefinition
Data definition specification.
file_specification : FileSpecification
File specification for CSV parsing.

Returns

Validation result
 
return : typing.Any
 
def check_in_pool_covs(self,
requested_pool_covs: List[PoolCovDefinition],
description: Optional[str] = None) ‑> CheckInPoolResult
Expand source code
@validate_call
def check_in_pool_covs(
    self,
    requested_pool_covs: List[PoolCovDefinition],
    description: Optional[str] = None
) -> CheckInPoolResult:
    """Create a new version from pool covariates.

    Parameters
    ----------
    requested_pool_covs: typing.list[futureexpert.pool.PoolCovDefinition]
        List of pool covariate definitions.
    description: typing.Optional[builtins.str]
        Short description of the selected covariates.

    Returns
    -------
    CheckInPoolResult with version_id and metadata
    return: futureexpert.pool.CheckInPoolResult

"""
    logger.info('Creating time series using checkin-pool...')
    payload = {
        'requested_pool_covs': [cov.model_dump() for cov in requested_pool_covs],
        'description': description
    }
    result = self._request('POST', '/api/v1/check-in/pool-covariate', json_data=payload)
    logger.info('Finished time series creation.')
    return CheckInPoolResult(**result)

Create a new version from pool covariates.

Parameters

requested_pool_covs : typing.list[PoolCovDefinition]
List of pool covariate definitions.
description : typing.Optional[builtins.str]
Short description of the selected covariates.

Returns

CheckInPoolResult with version_id and metadata
 
return : CheckInPoolResult
 
def check_in_time_series(self,
raw_data_source: Union[pd.DataFrame, Path, str],
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> str
Expand source code
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def check_in_time_series(
    self,
    raw_data_source: Union[pd.DataFrame, Path, str],
    data_definition: Optional[DataDefinition] = None,
    config_ts_creation: Optional[TsCreationConfig] = None,
    config_checkin: Optional[str] = None,
    file_specification: FileSpecification = FileSpecification()
) -> str:
    """Check in time series data.

    Only available in `Standard`, `Premium` and `Enterprise` subscription packages.

    Parameters
    ----------
    raw_data_source: typing.Union[pandas.DataFrame, pathlib.Path, builtins.str]
        DataFrame with raw data or path to CSV file.
    data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
        Data definition specification.
    config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
        Time series creation configuration.
    config_checkin: typing.Optional[builtins.str]
        Path to JSON config file (alternative to data_definition + config_ts_creation).
    file_specification: futureexpert.checkin.FileSpecification
        File specification for CSV parsing.

    Returns
    -------
    Version ID of the created time series
    return: builtins.str

"""
    form_data: Dict[str, Any] = {}
    files: Dict[str, Any] = {}

    if data_definition:
        form_data['data_definition'] = json.dumps(data_definition.model_dump())
    if config_ts_creation:
        form_data['config_ts_creation'] = json.dumps(config_ts_creation.model_dump())
    if config_checkin:
        files['config_checkin'] = open(config_checkin, 'rb')
    if file_specification:
        form_data['file_specification'] = json.dumps(file_specification.model_dump())

    try:

        if isinstance(raw_data_source, pd.DataFrame):

            with tempfile.TemporaryDirectory() as tmpdir:
                time_stamp = datetime.now().strftime('%Y-%m-%d-%H%M%S')
                file_path = os.path.join(tmpdir, f'expert-{time_stamp}.csv')
                date_format = data_definition.date_column.format if data_definition else None
                raw_data_source.to_csv(path_or_buf=file_path, index=False, sep=file_specification.delimiter,
                                       decimal=file_specification.decimal, encoding='utf-8-sig',
                                       date_format=date_format)
                files['file'] = open(file_path, 'rb')
                result = self._request('POST', '/api/v1/check-in', files=files or None, data=form_data)

        else:
            files['file'] = open(raw_data_source, 'rb')
            result = self._request('POST', '/api/v1/check-in', files=files or None, data=form_data)
        return str(result['version_id'])
    finally:
        for f in files.values():
            f.close()

Check in time series data.

Only available in Standard, Premium and Enterprise subscription packages.

Parameters

raw_data_source : typing.Union[pandas.DataFrame, pathlib.Path, builtins.str]
DataFrame with raw data or path to CSV file.
data_definition : typing.Optional[DataDefinition]
Data definition specification.
config_ts_creation : typing.Optional[TsCreationConfig]
Time series creation configuration.
config_checkin : typing.Optional[builtins.str]
Path to JSON config file (alternative to data_definition + config_ts_creation).
file_specification : FileSpecification
File specification for CSV parsing.

Returns

Version ID of the created time series
 
return : builtins.str
 
def create_scenario_values(self, config: ScenarioValuesConfig) ‑> ShaperConfig
Expand source code
@validate_call
def create_scenario_values(self,
                           config: ScenarioValuesConfig) -> ShaperConfig:
    """Creates scenario values for covariates based on a time series and forecast horizon.

    Parameters
    ----------
    config: futureexpert.shaper.ScenarioValuesConfig
        Configuration for the creation of scenario values.

    Returns
    -------
    A list of Scenario objects containing high and low projections for each covariate.
    return: futureexpert.shaper.ShaperConfig

"""

    payload = {'config': config.model_dump(mode='json')}
    result = self._request('POST', '/api/v1/shaper/prepare', json_data=payload)
    return ShaperConfig(**result)

Creates scenario values for covariates based on a time series and forecast horizon.

Parameters

config : ScenarioValuesConfig
Configuration for the creation of scenario values.

Returns

A list of Scenario objects containing high and low projections for each covariate.
return : ShaperConfig
 
def create_time_series(self,
user_input_id: str,
file_uuid: str,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> Any
Expand source code
@validate_call
def create_time_series(
    self,
    user_input_id: str,
    file_uuid: str,
    data_definition: Optional[DataDefinition] = None,
    config_ts_creation: Optional[TsCreationConfig] = None,
    config_checkin: Optional[str] = None,
    file_specification: FileSpecification = FileSpecification()
) -> Any:
    """Create time series from already uploaded data.

    This is the second step of the check-in process, after upload_data.

    Parameters
    ----------
    user_input_id: builtins.str
        UUID of the user input (from upload_data response).
    file_uuid: builtins.str
        UUID of the file (from upload_data response).
    data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
        Data definition specification.
    config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
        Time series creation configuration.
    config_checkin: typing.Optional[builtins.str]
        Path to JSON config file (alternative to data_definition + config_ts_creation).
    file_specification: futureexpert.checkin.FileSpecification
        File specification for CSV parsing.

    Returns
    -------
    Time series creation result with version information
    return: typing.Any

"""
    logger.info('Creating time series using CHECK-IN...')
    form_data: Dict[str, Any] = {
        'user_input_id': user_input_id,
        'file_uuid': file_uuid,
    }

    if data_definition:
        form_data['data_definition'] = json.dumps(data_definition.model_dump())
    if config_ts_creation:
        form_data['config_ts_creation'] = json.dumps(config_ts_creation.model_dump())
    if file_specification:
        form_data['file_specification'] = json.dumps(file_specification.model_dump())

    files: Dict[str, Any] = {}
    if config_checkin:
        files['config_checkin'] = open(config_checkin, 'rb')

    try:
        result = self._request('POST', '/api/v1/check-in/create', files=files or None, data=form_data)
        logger.info('Finished time series creation.')
        return result
    finally:
        for f in files.values():
            f.close()

Create time series from already uploaded data.

This is the second step of the check-in process, after upload_data.

Parameters

user_input_id : builtins.str
UUID of the user input (from upload_data response).
file_uuid : builtins.str
UUID of the file (from upload_data response).
data_definition : typing.Optional[DataDefinition]
Data definition specification.
config_ts_creation : typing.Optional[TsCreationConfig]
Time series creation configuration.
config_checkin : typing.Optional[builtins.str]
Path to JSON config file (alternative to data_definition + config_ts_creation).
file_specification : FileSpecification
File specification for CSV parsing.

Returns

Time series creation result with version information
 
return : typing.Any
 
def get_associator_results(self, id: Union[ReportIdentifier, int]) ‑> AssociatorResult | None
Expand source code
@validate_call
def get_associator_results(self, id: Union[ReportIdentifier, int]) -> Optional[AssociatorResult]:
    """Get associator results.

    Parameters
    ----------
    id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
        Report identifier or report ID.

    Returns
    -------
    Results of the ASSOCIATOR report.
    return: typing.Optional[futureexpert.associator.AssociatorResult]

"""
    report_id = id.report_id if isinstance(id, ReportIdentifier) else id

    report_status = self.get_report_status(id=id)
    has_results = self._can_load_results(report_status)

    if not has_results:
        return None

    result = self._request('GET', f'/api/v1/associator/{report_id}/results')
    return AssociatorResult(**result)

Get associator results.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or report ID.

Returns

Results of the ASSOCIATOR report.
return : typing.Optional[AssociatorResult]
 
def get_data(self) ‑> Any
Expand source code
def get_data(self) -> Any:
    """Get available raw data.

    Returns
    -------
    Meta information of the data already uploaded.
    

Parameters
----------
return: typing.Any

"""
    return self._request('GET', '/api/v1/check-in/data')

Get available raw data.

Returns
-------
Meta information of the data already uploaded.

Parameters

return : typing.Any
 
def get_fc_results(self,
id: Union[ReportIdentifier, int],
include_k_best_models: int = 1,
include_backtesting: bool = False,
include_discarded_models: bool = False) ‑> ForecastResults
Expand source code
@validate_call
def get_fc_results(
    self,
    id: Union[ReportIdentifier, int],
    include_k_best_models: int = 1,
    include_backtesting: bool = False,
    include_discarded_models: bool = False
) -> ForecastResults:
    """Get forecast results.

    Parameters
    ----------
    id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
        Report identifier or report ID.
    include_k_best_models: builtins.int
        Number of best models to include.
    include_backtesting: builtins.bool
        Include backtesting results.
    include_discarded_models: builtins.bool
        Include discarded models.

    Returns
    -------
    ForecastResults with forecast data
    return: futureexpert.forecast.ForecastResults

"""
    report_id = id.report_id if isinstance(id, ReportIdentifier) else id
    report_status = self.get_report_status(id=id)
    has_results = self._can_load_results(report_status)

    if not has_results:
        return ForecastResults(forecast_results=[])

    params = {
        'include_k_best_models': include_k_best_models,
        'include_backtesting': include_backtesting,
        'include_discarded_models': include_discarded_models
    }
    result = self._request('GET', f'/api/v1/forecast/{report_id}/results', params=params)

    # Parse results
    forecast_results = [ForecastResult.model_validate(r) for r in result['forecast_results']]
    fc_results = ForecastResults(forecast_results=forecast_results)

    if result.get('consistency') is not None:
        fc_results.consistency = ConsistentForecastMetadata.model_validate(result['consistency'])

    return fc_results

Get forecast results.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or report ID.
include_k_best_models : builtins.int
Number of best models to include.
include_backtesting : builtins.bool
Include backtesting results.
include_discarded_models : builtins.bool
Include discarded models.

Returns

ForecastResults with forecast data
 
return : ForecastResults
 
def get_matcher_results(self, id: Union[ReportIdentifier, int]) ‑> List[MatcherResult]
Expand source code
@validate_call
def get_matcher_results(self, id: Union[ReportIdentifier, int]) -> List[MatcherResult]:
    """Get matcher results.

    Parameters
    ----------
    id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
        Report identifier or report ID.

    Returns
    -------
    List of MatcherResult objects
    return: typing.list[futureexpert.matcher.MatcherResult]

"""
    report_id = id.report_id if isinstance(id, ReportIdentifier) else id

    report_status = self.get_report_status(id=id)
    has_results = self._can_load_results(report_status)

    if not has_results:
        return []

    result = self._request('GET', f'/api/v1/matcher/{report_id}/results')
    return [MatcherResult(**r) for r in result]

Get matcher results.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or report ID.

Returns

List of MatcherResult objects
 
return : typing.list[MatcherResult]
 
def get_pool_cov_overview(self, granularity: Optional[str] = None, search: Optional[str] = None) ‑> PoolCovOverview
Expand source code
@validate_call
def get_pool_cov_overview(
    self,
    granularity: Optional[str] = None,
    search: Optional[str] = None
) -> PoolCovOverview:
    """Get overview of available pool covariates.

    Parameters
    ----------
    granularity: typing.Optional[builtins.str]
        Filter by granularity (Day or Month).
    search: typing.Optional[builtins.str]
        Full-text search query.

    Returns
    -------
    PoolCovOverview with available covariates
    return: futureexpert.pool.PoolCovOverview

"""
    params = {}
    if granularity:
        params['granularity'] = granularity
    if search:
        params['search'] = search

    result = self._request('GET', '/api/v1/pool', params=params)
    return PoolCovOverview(overview_json=result['overview_json'])

Get overview of available pool covariates.

Parameters

granularity : typing.Optional[builtins.str]
Filter by granularity (Day or Month).
search : typing.Optional[builtins.str]
Full-text search query.

Returns

PoolCovOverview with available covariates
 
return : PoolCovOverview
 
def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) ‑> ReportStatus
Expand source code
@validate_call
def get_report_status(self, id: Union[ReportIdentifier, int], include_error_reason: bool = True) -> ReportStatus:
    """Gets the current status of a report.

    If the provided report identifier includes prerequisites, the status of the prerequisites is included, too.

    Parameters
    ----------
    id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
        Report identifier or plain report ID.
    include_error_reason: builtins.bool
        Determines whether log messages are to be included in the result.

    Returns
    -------
    The status of the report.
    return: futureexpert.shared_models.ReportStatus

"""
    identifier = id if isinstance(id, ReportIdentifier) else ReportIdentifier(report_id=id, settings_id=None)

    final_status = self._get_single_report_status(
        report_identifier=identifier, include_error_reason=include_error_reason)
    if isinstance(identifier, ChainedReportIdentifier):
        for prerequisite_identifier in identifier.prerequisites:
            prerequisite_status = self.get_report_status(id=prerequisite_identifier,
                                                         include_error_reason=include_error_reason)
            final_status.prerequisites.append(prerequisite_status)
    return final_status

Gets the current status of a report.

If the provided report identifier includes prerequisites, the status of the prerequisites is included, too.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or plain report ID.
include_error_reason : builtins.bool
Determines whether log messages are to be included in the result.

Returns

The status of the report.
return : ReportStatus
 
def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) ‑> str
Expand source code
@validate_call
def get_report_type(self, report_identifier: Union[int, ReportIdentifier]) -> str:
    """Get report type.

    Parameters
    ----------
    report_identifier: typing.Union[builtins.int, futureexpert.shared_models.ReportIdentifier]
        Report ID or identifier.

    Returns
    -------
    Report type string
    return: builtins.str

"""
    report_id = report_identifier.report_id if isinstance(
        report_identifier, ReportIdentifier
    ) else report_identifier

    result = self._request('GET', f'/api/v1/report/{report_id}')
    return str(result['type'])

Get report type.

Parameters

report_identifier : typing.Union[builtins.int, ReportIdentifier]
Report ID or identifier.

Returns

Report type string
 
return : builtins.str
 
def get_reports(self, skip: int = 0, limit: int = 100) ‑> PydanticModelList[ReportSummary]
Expand source code
@validate_call
def get_reports(self, skip: int = 0, limit: int = 100) -> PydanticModelList[ReportSummary]:
    """Get list of available reports.

    Parameters
    ----------
    skip: builtins.int
        Number of items to skip.
    limit: builtins.int
        Maximum number of items to return.

    Returns
    -------
    The available reports from newest to oldest.
    return: futureexpert.shared_models.PydanticModelList[futureexpert.shared_models.ReportSummary]

"""
    params = {'skip': skip, 'limit': limit}
    result = self._request('GET', '/api/v1/report', params=params)
    return PydanticModelList([ReportSummary.model_validate(report) for report in result])

Get list of available reports.

Parameters

skip : builtins.int
Number of items to skip.
limit : builtins.int
Maximum number of items to return.

Returns

The available reports from newest to oldest.
return : PydanticModelList[ReportSummary]
 
def get_shaper_results(self, id: Union[ReportIdentifier, int]) ‑> ShaperResult | None
Expand source code
@validate_call
def get_shaper_results(self, id: Union[ReportIdentifier, int]) -> Optional[ShaperResult]:
    """Gets the results from the given report.

    Parameters
    ----------
    id: typing.Union[futureexpert.shared_models.ReportIdentifier, builtins.int]
        Report identifier or plain report ID.

    Returns
    -------
    Results of the SHAPER report.
    return: typing.Optional[futureexpert.shaper.ShaperResult]

"""
    report_id = id.report_id if isinstance(id, ReportIdentifier) else id

    report_status = self.get_report_status(id=id)
    has_results = self._can_load_results(report_status)

    if not has_results:
        return None

    result = self._request('GET', f'/api/v1/shaper/{report_id}/results')
    return ShaperResult(**result)

Gets the results from the given report.

Parameters

id : typing.Union[ReportIdentifier, builtins.int]
Report identifier or plain report ID.

Returns

Results of the SHAPER report.
return : typing.Optional[ShaperResult]
 
def get_time_series(self, version_id: str) ‑> CheckInResult
Expand source code
@validate_call
def get_time_series(self, version_id: str) -> CheckInResult:
    """Get time series data by version ID.

    Parameters
    ----------
    version_id: builtins.str
        Time series version ID.

    Returns
    -------
    CheckInResult with time series data
    return: futureexpert.checkin.CheckInResult

"""
    result = self._request('GET', f'/api/v1/ts/{version_id}')
    return CheckInResult(**result)

Get time series data by version ID.

Parameters

version_id : builtins.str
Time series version ID.

Returns

CheckInResult with time series data
 
return : CheckInResult
 
def get_ts_versions(self, skip: int = 0, limit: int = 100) ‑> PydanticModelList[TimeSeriesVersion]
Expand source code
@validate_call
def get_ts_versions(self, skip: int = 0, limit: int = 100) -> PydanticModelList[TimeSeriesVersion]:
    """Get list of time series versions.

    Parameters
    ----------
    skip: builtins.int
        Number of items to skip.
    limit: builtins.int
        Maximum number of items to return.

    Returns
    -------
    DataFrame with time series versions
    return: futureexpert.shared_models.PydanticModelList[futureexpert.checkin.TimeSeriesVersion]

"""
    params = {'skip': skip, 'limit': limit}
    results = self._request('GET', '/api/v1/ts', params=params)
    return PydanticModelList([TimeSeriesVersion.model_validate(raw_result) for raw_result in results])

Get list of time series versions.

Parameters

skip : builtins.int
Number of items to skip.
limit : builtins.int
Maximum number of items to return.

Returns

DataFrame with time series versions
 
return : PydanticModelList[TimeSeriesVersion]
 
def logout(self) ‑> None
Expand source code
def logout(self) -> None:
    """Logout from futureEXPERT.

    If logged in with a refresh token. The refresh token is revoked.
    

Parameters
----------
return: builtins.NoneType

"""
    if (refresh_token := self._oauth_token.get('refresh_token')) is None:
        raise RuntimeError('Cannot logout without refresh_token')
    self.auth_client.logout(refresh_token)
    logger.info('Successfully logged out.')

Logout from futureEXPERT.

If logged in with a refresh token. The refresh token is revoked.

Parameters

return : builtins.NoneType
 
def start_associator(self, config: AssociatorConfig) ‑> ReportIdentifier
Expand source code
@validate_call
def start_associator(self, config: AssociatorConfig) -> ReportIdentifier:
    """Start an associator report.

    Parameters
    ----------
    config: futureexpert.associator.AssociatorConfig
        Associator configuration.

    Returns
    -------
    ReportIdentifier with report_id and settings_id
    return: futureexpert.shared_models.ReportIdentifier

"""
    payload = {'config': config.model_dump()}
    result = self._request('POST', '/api/v1/associator', json_data=payload)
    report = ReportIdentifier.model_validate(result)
    logger.info(f'Report created with ID {report.report_id}. Associator is running...')
    return report

Start an associator report.

Parameters

config : AssociatorConfig
Associator configuration.

Returns

ReportIdentifier with report_id and settings_id
 
return : ReportIdentifier
 
def start_forecast(self,
version: str,
config: ReportConfig,
reconciliation_config: Optional[ReconciliationConfig] = None) ‑> ReportIdentifier | ChainedReportIdentifier
Expand source code
@validate_call
def start_forecast(
    self,
    version: str,
    config: ReportConfig,
    reconciliation_config: Optional[ReconciliationConfig] = None
) -> Union[ReportIdentifier, ChainedReportIdentifier]:
    """Start a forecasting report.

    Parameters
    ----------
    version: builtins.str
        Time series version ID.
    config: futureexpert.forecast.ReportConfig
        Forecast configuration.
    reconciliation_config: futureexpert.forecast.ReportConfig
        Configuration to make forecasts consistent over hierarchical levels.

    Returns
    -------
    ReportIdentifier with report_id and settings_id.
    If reconciliation_config is provided, returns ChainedReportIdentifier
    with prerequisites containing the forecast report identifier.
    reconciliation_config: typing.Optional[futureexpert.forecast_consistency.ReconciliationConfig]

return: typing.Union[futureexpert.shared_models.ReportIdentifier, futureexpert.shared_models.ChainedReportIdentifier]

"""
    payload: Dict[str, Any] = {
        'version': version,
        'config': config.model_dump()
    }

    if reconciliation_config is not None:
        payload['reconciliation_config'] = reconciliation_config.model_dump()

    logger.info('Started creating FORECAST...')
    result = self._request('POST', '/api/v1/forecast', json_data=payload)

    identifier_model = ChainedReportIdentifier if 'prerequisites' in result else ReportIdentifier
    report_identifier = identifier_model.model_validate(result)
    logger.info(f'Report created with ID {report_identifier.report_id}. Forecasts are running...')
    return report_identifier

Start a forecasting report.

Parameters
----------
version: builtins.str
    Time series version ID.
config: futureexpert.forecast.ReportConfig
    Forecast configuration.
reconciliation_config: futureexpert.forecast.ReportConfig
    Configuration to make forecasts consistent over hierarchical levels.

Returns
-------
ReportIdentifier with report_id and settings_id.
If reconciliation_config is provided, returns ChainedReportIdentifier
with prerequisites containing the forecast report identifier.
reconciliation_config: typing.Optional[futureexpert.forecast_consistency.ReconciliationConfig]

return: typing.Union[futureexpert.shared_models.ReportIdentifier, futureexpert.shared_models.ChainedReportIdentifier]

def start_forecast_from_raw_data(self,
raw_data_source: Union[pd.DataFrame, Path, str],
config_fc: ReportConfig,
data_definition: Optional[DataDefinition] = None,
config_ts_creation: Optional[TsCreationConfig] = None,
config_checkin: Optional[str] = None,
file_specification: FileSpecification = FileSpecification(delimiter=',', decimal='.', thousands=None)) ‑> ReportIdentifier
Expand source code
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def start_forecast_from_raw_data(self,
                                 raw_data_source: Union[pd.DataFrame, Path, str],
                                 config_fc: ReportConfig,
                                 data_definition: Optional[DataDefinition] = None,
                                 config_ts_creation: Optional[TsCreationConfig] = None,
                                 config_checkin: Optional[str] = None,
                                 file_specification: FileSpecification = FileSpecification()) -> ReportIdentifier:
    """Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.

    Parameters
    ----------
    raw_data_source: typing.Union[pandas.DataFrame, pathlib.Path, builtins.str]
        A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
    config_fc: futureexpert.forecast.ReportConfig
        The configuration of the forecast run.
    data_definition: typing.Optional[futureexpert.checkin.DataDefinition]
        Specifies the data, value and group columns and which rows and columns should be removed.
    config_ts_creation: typing.Optional[futureexpert.checkin.TsCreationConfig]
        Defines filter and aggreagtion level of the time series.
    config_checkin: typing.Optional[builtins.str]
        Path to the JSON file with the CHECK-IN configuration. `config_ts_creation` and `config_checkin`
        cannot be set simultaneously. The configuration may be obtained from the last step of
        CHECK-IN using the future frontend (now.future-forecasting.de).
    file_specification: futureexpert.checkin.FileSpecification
        Needed if a CSV is used with e.g. German format.

    Returns
    -------
    The identifier of the forecasting report.
    return: futureexpert.shared_models.ReportIdentifier

"""

    assert config_fc.rerun_report_id is None, 'start_forecast_from_raw_data can not be used with rerun_report_id.'

    upload_feedback = self.upload_data(source=raw_data_source, file_specification=file_specification)

    user_input_id = upload_feedback['uuid']
    file_id = upload_feedback['files'][0]['uuid']

    res2 = self.create_time_series(user_input_id=user_input_id,
                                   file_uuid=file_id,
                                   data_definition=data_definition,
                                   config_ts_creation=config_ts_creation,
                                   config_checkin=config_checkin,
                                   file_specification=file_specification)

    version = res2['result']['tsVersion']
    return self.start_forecast(version=version, config=config_fc)

Starts a forecast run from raw data without the possibility to inspect interim results from the data preparation.

Parameters

raw_data_source : typing.Union[pandas.DataFrame, pathlib.Path, builtins.str]
A Pandas DataFrame that contains the raw data or path to where the CSV file with the data is stored.
config_fc : ReportConfig
The configuration of the forecast run.
data_definition : typing.Optional[DataDefinition]
Specifies the data, value and group columns and which rows and columns should be removed.
config_ts_creation : typing.Optional[TsCreationConfig]
Defines filter and aggreagtion level of the time series.
config_checkin : typing.Optional[builtins.str]
Path to the JSON file with the CHECK-IN configuration. config_ts_creation and config_checkin cannot be set simultaneously. The configuration may be obtained from the last step of CHECK-IN using the future frontend (now.future-forecasting.de).
file_specification : FileSpecification
Needed if a CSV is used with e.g. German format.

Returns

The identifier of the forecasting report.
return : ReportIdentifier
 
def start_making_forecast_consistent(self, config: MakeForecastConsistentConfiguration) ‑> ReportIdentifier
Expand source code
@validate_call
def start_making_forecast_consistent(
    self,
    config: MakeForecastConsistentConfiguration
) -> ReportIdentifier:
    """Start hierarchical forecast reconciliation process.

    Makes forecasts consistent across hierarchical levels.

    Parameters
    ----------
    config: futureexpert.forecast_consistency.MakeForecastConsistentConfiguration
        Configuration for the reconciliation process.

    Returns
    -------
    ReportIdentifier with report_id and settings_id
    return: futureexpert.shared_models.ReportIdentifier

"""
    payload: Dict[str, Any] = {
        'data_selection': config.data_selection.model_dump(),
        'report_note': config.report_note
    }

    if config.db_name:
        payload['db_name'] = config.db_name
    if config.reconciliation:
        payload['reconciliation'] = config.reconciliation.model_dump()

    logger.info('Started creating hierarchical reconciliation for consistent forecasts...')
    result = self._request('POST', '/api/v1/forecast/reconcile', json_data=payload)
    report = ReportIdentifier.model_validate(result)
    logger.info(f'Report created with ID {report.report_id}. Reconciliation is running...')
    return report

Start hierarchical forecast reconciliation process.

Makes forecasts consistent across hierarchical levels.

Parameters

config : MakeForecastConsistentConfiguration
Configuration for the reconciliation process.

Returns

ReportIdentifier with report_id and settings_id
 
return : ReportIdentifier
 
def start_matcher(self, config: MatcherConfig) ‑> ReportIdentifier
Expand source code
@validate_call
def start_matcher(self, config: MatcherConfig) -> ReportIdentifier:
    """Start a covariate matcher report.

    Parameters
    ----------
    config: futureexpert.matcher.MatcherConfig
        Matcher configuration.

    Returns
    -------
    ReportIdentifier with report_id and settings_id
    return: futureexpert.shared_models.ReportIdentifier

"""
    payload = {'config': config.model_dump()}
    result = self._request('POST', '/api/v1/matcher', json_data=payload)
    report = ReportIdentifier.model_validate(result)
    logger.info(f'Report created with ID {report.report_id}. Matching indicators...')
    return report

Start a covariate matcher report.

Parameters

config : MatcherConfig
Matcher configuration.

Returns

ReportIdentifier with report_id and settings_id
 
return : ReportIdentifier
 
def start_scenario_forecast(self, config: ShaperConfig) ‑> ReportIdentifier
Expand source code
@validate_call
def start_scenario_forecast(self, config: ShaperConfig) -> ReportIdentifier:
    """Start forecast for scenarios.

    Parameters
    ----------
    config: futureexpert.shaper.ShaperConfig
        Configuration for a SHAPER run.
    return: futureexpert.shared_models.ReportIdentifier

"""
    ref_config = copy.deepcopy(config)
    for scenario in ref_config.scenarios:
        if isinstance(scenario.ts, Covariate):
            scenario.ts = CovariateRef(name=scenario.ts.ts.name, lag=scenario.ts.lag)

    payload = {'config': ref_config.model_dump(mode='json')}

    result = self._request('POST', '/api/v1/shaper', json_data=payload)
    report = ReportIdentifier.model_validate(result)
    logger.info(f'Report created with ID {report.report_id}. Shaping scenarios...')
    return report

Start forecast for scenarios.

Parameters

config : ShaperConfig
Configuration for a SHAPER run.
return : ReportIdentifier
 
def upload_data(self,
source: Union[pd.DataFrame, str],
file_specification: Optional[FileSpecification] = None) ‑> Any
Expand source code
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def upload_data(
    self,
    source: Union[pd.DataFrame, str],
    file_specification: Optional[FileSpecification] = None
) -> Any:
    """Upload raw data for further processing.

    Parameters
    ----------
    source: typing.Union[pandas.DataFrame, builtins.str]
        Path to a CSV file or a pandas DataFrame.
    file_specification: typing.Optional[futureexpert.checkin.FileSpecification]
        File specification for CSV parsing.

    Returns
    -------
    Upload feedback with user_input_id and file_uuid
    return: typing.Any

"""
    if isinstance(source, pd.DataFrame):
        # Convert DataFrame to JSON for upload
        data_json = source.to_dict(orient='records')
        form_data = {
            'data': json.dumps(data_json)
        }
        if file_specification:
            form_data['file_specification'] = json.dumps(file_specification.model_dump())

        return self._request('POST', '/api/v1/check-in/data', data=form_data)
    else:
        # Upload file
        with open(source, 'rb') as f:
            files = {'file': f}
            data = {}
            if file_specification:
                data['file_specification'] = json.dumps(file_specification.model_dump())

            return self._request('POST', '/api/v1/check-in/data', files=files, data=data)

Upload raw data for further processing.

Parameters

source : typing.Union[pandas.DataFrame, builtins.str]
Path to a CSV file or a pandas DataFrame.
file_specification : typing.Optional[FileSpecification]
File specification for CSV parsing.

Returns

Upload feedback with user_input_id and file_uuid
 
return : typing.Any
 
class MissingCredentialsError (missing_credential_type: str)
Expand source code
class MissingCredentialsError(RuntimeError):
    def __init__(self, missing_credential_type: str) -> None:
        super().__init__(f'Please enter {missing_credential_type} either when ' +
                         'initializing the expert client or in the the .env file!')

Unspecified run-time error.

Ancestors

  • builtins.RuntimeError
  • builtins.Exception
  • builtins.BaseException