The following code solves the problem posed by the question, i.e., retrieve schedules that have at least one job (any component in the pipeline that runs on the schedule) successfully finished.
The issue however is that when a schedule has multiple runs, only the first run is considered.
A more interesting problem is to retrieve schedules that have at least one run completed, or the last run completed, or the last run failed. I will address this question in a separate post (please answer the question if you have a better solution)
```python
# -------------------------------------------------
# Connect to AML and set tracking URI in mlflow
# -------------------------------------------------
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
# Connect to AML
client = MLClient(
credential= InteractiveBrowserCredential(),
subscription_id="my-subscription-id",
resource_group_name="my-resource-group",
workspace_name="my-workspace"
)
# set tracking uri if run locally
mlflow_tracking_uri = client.workspaces.get(client.workspace_name).mlflow_tracking_uri
mlflow.set_tracking_uri(mlflow_tracking_uri)
# -------------------------------------------------
# Retrieve and filter schedules
# -------------------------------------------------
schedules = client.schedules.list()
# optional: filter schedules based on name containing substring:
selected_schedules = [
schedule
for schedule in schedules
if "inference_pipelin" in schedule.name
]
# -------------------------------------------------
# Get schedules that have *at least* one job (not one run) completed
# -------------------------------------------------
experiment_names = [schedule.create_job.experiment_name for schedule in selected_schedules]
filter_string = " or ".join([f"(name = {x})" for x in experiment_names])
experiments = mlflow.search_experiments(filter_string=filter_string)
experiments_df = pd.DataFrame(
{
"experiment_id": [exp.experiment_id for exp in experiments],
"experiment_name": [exp.name for exp in experiments],
"schedule": selected_schedules,
}
)
all_runs = mlflow.search_runs(
experiment_names=experiment_names,
filter_string="tags.mlflow.user='Jaume Amores'",
)
selected_experiments = all_runs.groupby("experiment_id")["status"].apply(lambda x: (x == "FINISHED").any())
selected_schedules = experiments_df[experiments_df["experiment_id"].isin(selected_experiments[selected_experiments].index)]["schedule"].tolist()
```
First code snippet, slow if we have many experiments and runs (see below second code snippet):
```
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
# Connect to AML
client = MLClient(
credential= InteractiveBrowserCredential(),
subscription_id="my-subscription-id",
resource_group_name="my-resource-group",
workspace_name="my-workspace"
)
# set tracking uri if run locally
mlflow_tracking_uri = client.workspaces.get(client.workspace_name).mlflow_tracking_uri
mlflow.set_tracking_uri(mlflow_tracking_uri)
# select all runs
# (slow, see alternative in second code snippet below)
all_runs = mlflow.search_runs(search_all_experiments=True)
# select runs whose my-metric-name is bigger than my_threshold
metric_name = "my-metric-name"
my_threshold = 0.9
selected_runs = all_runs.loc[all_runs[f"metrics.{metric_name}"] > my_threshold]
# get corresponding experiment IDs:
selected_experiment_ids = selected_runs["experiment_id"].unique()
# retrieve all experiments
# (slow, see alternative in second code snippet below)
exps = mlflow.search_experiments()
# show experiments whose experiment-id matches the selected runs
for exp in exps:
if exp.experiment_id in selected_experiment_ids:
print(f"Experiment name: {exp.name}\nExperiment ID: {exp.experiment_id}")
```
If we can narrow down our list of candidate experiments using either a list of experiment names we want to focus on, the user name who created the experiments, or both, we can get a faster response. This would be the resulting code after setting the tracking uri:
```
# restrict experiment name to be one of the following:
experiment_names = ["my-first-experiment", "my-second-experiment", "my-third-experiment"]
# restrict user id to be the following:
user_id = "my-user-id"
# select runs
all_runs = mlflow.search_runs(
experiment_names=experiment_names,
filter_string=f"tags.mlflow.user='{user_id}'",
)
# select runs whose my-metric-name is bigger than my_threshold
metric_name = "my-metric-name"
my_threshold = 0.9
selected_runs = all_runs.loc[all_runs[f"metrics.{metric_name}"] > my_threshold]
# get corresponding experiment IDs:
selected_experiment_ids = selected_runs["experiment_id"].unique()
# retrieve all experiments whose name is in the experiment_names list
filter_string = " or ".join([f"(name = {x})" for x in experiment_names])
exps = mlflow.search_experiments(filter_string=filter_string)
# show experiments whose experiment-id matches the selected runs
for exp in exps:
if exp.experiment_id in selected_experiment_ids:
print(f"Experiment name: {exp.name}\nExperiment ID: {exp.experiment_id}")
```