"""Gantry's public API."""importosfromcontextlibimportExitStackfrompathlibimportPathfromtypingimportAny,LiteralfrombeakerimportBeakerfrom.importbeaker_utilsfrom.callbacksimport*from.git_utilsimportGitRepoStatefrom.launchimportfollow_workload,launch_experimentfrom.recipeimportRecipe__all__=["Recipe","GitRepoState","Callback","SlackCallback","launch_experiment","follow_workload","update_workload_description","write_metrics",]_ORIGINAL_WORKLOAD_DESCRIPTIONS:dict[str,str]={}
[docs]defupdate_workload_description(description:str,strategy:Literal["append","prepend","replace"]="replace",beaker_token:str|None=None,client:Beaker|None=None,)->str:""" Update the description of the Gantry workload that this process is running in. :param description: The description to set or add, depending on the ``strategy``. :param strategy: One of "append", "prepend", or "replace" to indicate how the new description should be combined with the original description. Defaults to "replace". :param beaker_token: An optional Beaker API token to use. If not provided, the ``BEAKER_TOKEN`` environment variable will be used if set, or a Beaker config file. Alternatively you can provide an existing :class:`~beaker.Beaker` client via the ``client`` parameter. :param client: An optional existing :class:`~beaker.Beaker` client to use. If not provided, a new client will be created using the provided ``beaker_token`` or environment/config. """global_ORIGINAL_WORKLOAD_DESCRIPTIONSif(workload_id:=os.environ.get("BEAKER_WORKLOAD_ID"))isNone:raiseRuntimeError("'update_workload_description' can only be called from within a running workload")withExitStack()asstack:ifclientisNone:beaker:Beaker=stack.enter_context(beaker_utils.init_client(ensure_workspace=False,beaker_token=beaker_token,check_for_upgrades=False))else:beaker=clientworkload=beaker.workload.get(workload_id)ifworkload_idnotin_ORIGINAL_WORKLOAD_DESCRIPTIONS:_ORIGINAL_WORKLOAD_DESCRIPTIONS[workload_id]=(workload.experiment.descriptionor"").strip()og_description=_ORIGINAL_WORKLOAD_DESCRIPTIONS[workload_id]ifstrategy=="append":description=og_description+" "+descriptionelifstrategy=="prepend":description=description+" "+og_descriptionelifstrategy!="replace":raiseValueError(f"'strategy' must be one of 'append', 'prepend', or 'replace', but got '{strategy}'.")description=description.strip()beaker.workload.update(workload,description=description)returndescription
[docs]defwrite_metrics(metrics:dict[str,Any]):""" Write result metrics for the Gantry workload that this process is running in. :param metrics: A JSON-serializable dictionary of metrics to write. """importjsonifos.environ.get("BEAKER_WORKLOAD_ID")isNone:raiseRuntimeError("'write_metrics' can only be called from within a running workload")if(results_dir:=os.environ.get("RESULTS_DIR"))isNone:raiseRuntimeError("Results directory not set! Can't write metrics.")metrics_path=Path(results_dir)/"metrics.json"metrics_path.parent.mkdir(exist_ok=True,parents=True)withmetrics_path.open("w")asf:json.dump(metrics,f)