```python
# Global var: desired_status (change from "Completed" to "Failed" according to your requirements)
desired_status = "Completed"
# --------------------------------------------------------------------------------------------------
# Connect to AML and set tracking URI in mlflow
# --------------------------------------------------------------------------------------------------
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azureml.core.experiment import Experiment
from azureml.core.workspace import Workspace
# Connect to AML
client = MLClient(
credential= InteractiveBrowserCredential(),
subscription_id="my-subscription-id",
resource_group_name="my-resource-group",
workspace_name="my-workspace"
)
# --------------------------------------------------------------------------------------------------
# Get workspace
# --------------------------------------------------------------------------------------------------
workspace = Workspace(
client.subscription_id,
client.resource_group_name,
client.workspace_name
)
# --------------------------------------------------------------------------------------------------
# Retrieve schedules
# --------------------------------------------------------------------------------------------------
schedules = client.schedules.list()
# optional: filter those with desired name patterns
selected_schedules = [
schedule
for schedule in schedules
if "inference" in schedule.name
]
# --------------------------------------------------------------------------------------------------
Select schedules *names* that meet the criteria (at least one run completed)
# --------------------------------------------------------------------------------------------------
finished_schedules = []
for schedule in selected_schedules:
experiment = Experiment(workspace, schedule.create_job.experiment_name)
if any(map(lambda x: x.status == desired_status, experiment.get_runs())):
finished_schedules.append(schedule.name)
```
If we want the ones that have their last run failed:
```python
last_run_failed_schedules = []
times = []
for schedule in selected_schedules:
experiment = Experiment(workspace, schedule.create_job.experiment_name)
last_run = next(experiment.get_runs())
if last_run.status == "Failed":
last_run_failed_schedules.append(schedule.name)
```
The following code solves the problem posed by the question, i.e., retrieve schedules that have at least one job (any component in the pipeline that runs on the schedule) successfully finished.
The issue however is that when a schedule has multiple runs, only the first run is considered.
A more interesting problem is to retrieve schedules that have at least one run completed, or the last run completed, or the last run failed. I will address this question in a separate post (please answer the question if you have a better solution)
```python
# -------------------------------------------------
# Connect to AML and set tracking URI in mlflow
# -------------------------------------------------
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
# Connect to AML
client = MLClient(
credential= InteractiveBrowserCredential(),
subscription_id="my-subscription-id",
resource_group_name="my-resource-group",
workspace_name="my-workspace"
)
# set tracking uri if run locally
mlflow_tracking_uri = client.workspaces.get(client.workspace_name).mlflow_tracking_uri
mlflow.set_tracking_uri(mlflow_tracking_uri)
# -------------------------------------------------
# Retrieve and filter schedules
# -------------------------------------------------
schedules = client.schedules.list()
# optional: filter schedules based on name containing substring:
selected_schedules = [
schedule
for schedule in schedules
if "inference_pipelin" in schedule.name
]
# -------------------------------------------------
# Get schedules that have *at least* one job (not one run) completed
# -------------------------------------------------
experiment_names = [schedule.create_job.experiment_name for schedule in selected_schedules]
filter_string = " or ".join([f"(name = {x})" for x in experiment_names])
experiments = mlflow.search_experiments(filter_string=filter_string)
experiments_df = pd.DataFrame(
{
"experiment_id": [exp.experiment_id for exp in experiments],
"experiment_name": [exp.name for exp in experiments],
"schedule": selected_schedules,
}
)
all_runs = mlflow.search_runs(
experiment_names=experiment_names,
filter_string="tags.mlflow.user='Jaume Amores'",
)
selected_experiments = all_runs.groupby("experiment_id")["status"].apply(lambda x: (x == "FINISHED").any())
selected_schedules = experiments_df[experiments_df["experiment_id"].isin(selected_experiments[selected_experiments].index)]["schedule"].tolist()
```