Python SDK#

SkyPilot 提供了一个 Python SDK,CLI 在底层使用了它。

大多数 SDK 调用是**异步的并返回一个 future**(request ID 请求 ID)。

等待并获取结果

管理异步请求

有关更多详细信息,请参阅每个 API 的Request Returns(请求返回)和Request Raises(请求引发)部分。

注意

从 v0.8 或更早版本升级:如果您从 0.8.0 或更早版本升级到任何更新版本,您需要更新您的程序以适应新的异步执行模型。有关更多详细信息,请参阅迁移指南

集群 SDK#

sky.launch#

sky.launch(task, cluster_name=None, retry_until_up=False, idle_minutes_to_autostop=None, dryrun=False, down=False, backend=None, optimize_target=OptimizeTarget.COST, no_setup=False, clone_disk_from=None, fast=False, _need_confirmation=False, _is_launched_by_jobs_controller=False, _is_launched_by_sky_serve_controller=False, _disable_controller_check=False)[源代码]

启动集群或任务。

任务的 setup(设置)和 run(运行)命令在任务的 workdir 下执行(如果指定,它会同步到远程集群)。任务将在集群上进行作业队列调度。

当前,第一个参数必须是 sky.Task,或者(实验性高级用法)sky.Dag。后一种情况目前必须只包含一个单个任务;对 pipeline/通用 DAG 的支持仍在实验性分支中。

示例

import sky
task = sky.Task(run='echo hello SkyPilot')
task.set_resources(
    sky.Resources(cloud=sky.AWS(), accelerators='V100:4'))
sky.launch(task, cluster_name='my-cluster')
参数:
  • task (Union[Task, Dag]) – 要启动的 sky.Task 或 sky.Dag(实验性;仅限单个任务)。

  • cluster_name (Optional[str]) – 要创建/重用集群的名称。如果为 None,则自动生成名称。

  • retry_until_up (bool) – 是否重试启动集群直到它启动。

  • idle_minutes_to_autostop (Optional[int]) – 集群空闲(即集群作业队列中没有运行或等待中的作业)达到指定分钟数后自动停止。作业队列中发现 setting-up/运行/等待中的作业时,空闲计时器会重置。设置此标志等同于运行 sky.launch() 然后运行 sky.autostop(idle_minutes=<minutes>)。如果未设置,集群将不会自动停止。

  • dryrun (bool) – 如果为 True,则不实际启动集群。

  • down (bool) – 所有作业完成(成功或异常)后销毁集群。如果同时设置了 –idle-minutes-to-autostop,集群将在指定的空闲时间后销毁。请注意,如果在资源供应/数据同步/设置过程中发生错误,集群将不会被销毁,以便于调试。

  • backend (Optional[Backend]) – 要使用的后端。如果为 None,则使用默认后端 (CloudVMRayBackend)。

  • optimize_target (OptimizeTarget) – 要优化的目标。选项:OptimizeTarget.COST(成本),OptimizeTarget.TIME(时间)。

  • no_setup (bool) – 如果为 True,则不重新运行 setup 命令。

  • clone_disk_from (Optional[str]) – [实验性] 如果设置,则从指定的集群克隆磁盘。这对于将集群迁移到不同的可用区或区域非常有用。

  • fast (bool) – [实验性] 如果集群已启动并可用,则跳过资源供应和 setup 步骤。

  • _need_confirmation (bool) – (仅内部使用) 如果为 True,则显示确认提示。

返回类型:

str

返回:

启动请求的请求 ID。

请求返回:
  • job_id (Optional[int]) – 提交作业的作业 ID。如果后端不是 CloudVmRayBackend 或未向集群提交作业,则为 None。

  • handle (Optional[backends.ResourceHandle]) – 集群句柄。如果为 dryrun,则为 None。

请求引发异常:
  • exceptions.ClusterOwnerIdentityMismatchError – 如果集群归另一用户所有。

  • exceptions.InvalidClusterNameError – 如果集群名称无效。

  • exceptions.ResourcesMismatchError – 如果请求的资源与现有集群不匹配。

  • exceptions.NotSupportedError – 如果后端/云/集群不支持所需功能。

  • exceptions.ResourcesUnavailableError – 如果请求的资源无法满足。异常的 failover_history 将被设置为:

    1. Empty:当第一次 sky.optimize() 未能找到可行资源时;未尝试预检查或实际启动。

    2. Non-empty:当至少发生 1 个异常,可能是来自我们的预检查(例如集群名称无效)或某个区域/可用区抛出资源不可用。

  • exceptions.CommandError – 任何 ssh 命令错误。

  • exceptions.NoCloudAccessError – 如果所有云提供商都被禁用。

根据后端可能引发其他异常。

sky.stop#

sky.stop(cluster_name, purge=False)[源代码]

停止集群。

停止集群时,附加磁盘上的数据不会丢失。实例的计费会停止,但磁盘仍会收费。重新启动集群时,这些磁盘将重新附加。

目前,Spot 实例集群无法停止(GCP 除外,它允许在停止 Spot VM 时保留磁盘内容)。

参数:
  • cluster_name (str) – 要停止集群的名称。

  • purge (bool) – (高级) 即使在云端实际停止集群操作失败,也强制在 SkyPilot 的集群表中将集群标记为已停止。警告:此标志仅在某些手动故障排除场景中谨慎设置;设置此标志后,用户有责任确保没有泄露的实例和相关资源。

返回类型:

str

返回:

停止请求的请求 ID。

请求返回:

None

请求引发异常:
  • sky.exceptions.ClusterDoesNotExist – 指定的集群不存在。

  • RuntimeError – 停止集群失败。

  • sky.exceptions.NotSupportedError – 如果指定的集群是 Spot 集群、TPU VM Pod 集群或托管作业控制器。

sky.start#

sky.start(cluster_name, idle_minutes_to_autostop=None, retry_until_up=False, down=False, force=False)[源代码]

重启集群。

如果集群之前已停止(状态为 STOPPED)或在资源供应/运行时安装中失败(状态为 INIT),此函数将尝试启动集群。在后一种情况下,将重试资源供应和运行时安装。

重启已停止的集群时,不使用自动故障转移资源供应。集群将在之前选择的相同云、区域和可用区上启动。

如果集群已处于 UP 状态,此函数无效。

参数:
  • cluster_name (str) – 要启动集群的名称。

  • idle_minutes_to_autostop (Optional[int]) – 集群空闲(即集群作业队列中没有运行或等待中的作业)达到指定分钟数后自动停止。作业队列中发现 setting-up/运行/等待中的作业时,空闲计时器会重置。设置此标志等同于运行 sky.launch() 然后运行 sky.autostop(idle_minutes=<minutes>)。如果未设置,集群将不会自动停止。

  • retry_until_up (bool) – 是否重试启动集群直到它启动。

  • down (bool) – 自动销毁集群:所有作业完成(成功或异常)后,在指定的空闲时间后销毁集群。需要设置 idle_minutes_to_autostop

  • force (bool) – 是否强制启动集群,即使它已启动。对升级 SkyPilot 运行时很有用。

返回类型:

str

返回:

启动请求的请求 ID。

请求返回:

None

请求引发异常:
  • ValueError – 参数值无效:(1) 如果 down 设置为 True 但 idle_minutes_to_autostop 为 None;(2) 如果指定的集群是托管作业控制器,并且 idle_minutes_to_autostop 不为 None 或 down 为 True(省略它们以使用默认的自动停止设置)。

  • sky.exceptions.ClusterDoesNotExist – 指定的集群不存在。

  • sky.exceptions.NotSupportedError – 如果要重启的集群是使用不支持此操作的非默认后端启动的。

  • sky.exceptions.ClusterOwnerIdentitiesMismatchError – 如果要重启的集群是由不同用户启动的。

sky.down#

sky.down(cluster_name, purge=False)[源代码]

销毁集群。

销毁集群将删除所有相关资源(所有计费停止),并且附加磁盘上的任何数据都将丢失。集群中的加速器(例如 TPU)也将被删除。

参数:
  • cluster_name (str) – 要销毁集群的名称。

  • purge (bool) – (高级) 即使在云端实际终止集群失败,也强制从 SkyPilot 的集群表中删除该集群。警告:此标志仅在某些手动故障排除场景中谨慎设置;设置此标志后,用户有责任确保没有泄露的实例和相关资源。

返回类型:

str

返回:

销毁请求的请求 ID。

请求返回:

None

请求引发异常:
  • sky.exceptions.ClusterDoesNotExist – 指定的集群不存在。

  • RuntimeError – 销毁集群失败。

  • sky.exceptions.NotSupportedError – 指定的集群是托管作业控制器。

sky.status#

sky.status(cluster_names=None, refresh=StatusRefreshMode.NONE, all_users=False)[源代码]

获取集群状态。

如果指定了 cluster_names,则返回这些集群。否则,返回所有集群。

每个集群可以具有以下状态之一:

  • INIT: 集群可能正在运行或已关闭。可能发生在以下情况:

    • 正在进行的资源供应或运行时设置。(sky.launch() 已启动但尚未完成。)

    • 或者,集群处于异常状态,例如,某些集群节点已关闭,或 SkyPilot 运行时不健康。(要恢复集群,请再次在其上尝试 sky launch。)

  • UP: 资源供应和运行时设置已成功,并且集群正在运行。(最近的 sky.launch() 已成功完成。)

  • STOPPED: 集群已停止且存储已持久化。使用 sky.start() 重启集群。

自动停止列

  • 自动停止列指示集群在空闲(没有作业运行)多少分钟后将自动停止。如果 to_down 为 True,集群将自动销毁而不是自动停止。

获取最新的集群状态

  • 在集群完全由 SkyPilot 管理(即没有在云控制台中进行手动操作)且未使用自动停止的正常情况下,此命令返回的表格将准确反映集群状态。

  • 在集群在 SkyPilot 外部发生更改(例如,在云控制台中进行手动操作;未管理的 Spot 集群被抢占)或对于启用了自动停止的集群,请使用 refresh=True 从云提供商查询最新的集群状态。

参数:
  • cluster_names (Optional[List[str]]) – 要查询的集群名称列表。如果未提供,将查询所有集群。

  • refresh (StatusRefreshMode) – 是否从云提供商查询最新的集群状态。

  • all_users (bool) – 是否包含所有用户的集群。默认情况下,仅包含当前用户的集群。

返回类型:

str

返回:

状态请求的请求 ID。

请求返回:

cluster_records (List[Dict[str, Any]]) – 一个字典列表,每个字典包含一个集群的信息。如果发现集群已终止或未找到,将从返回列表中省略。

{
  'name': (str) cluster name,
  'launched_at': (int) timestamp of last launch on this cluster,
  'handle': (ResourceHandle) an internal handle to the cluster,
  'last_use': (str) the last command/entrypoint that affected this
  cluster,
  'status': (sky.ClusterStatus) cluster status,
  'autostop': (int) idle time before autostop,
  'to_down': (bool) whether autodown is used instead of autostop,
  'metadata': (dict) metadata of the cluster,
  'user_hash': (str) user hash of the cluster owner,
  'user_name': (str) user name of the cluster owner,
  'resources_str': (str) the resource string representation of the
    cluster,
}

sky.autostop#

sky.autostop(cluster_name, idle_minutes, down=False)[源代码]

为集群调度自动停止/自动销毁。

自动停止/自动销毁将在集群空闲指定时长(集群作业队列中没有进行中(等待中/运行中)的作业)后自动停止或销毁集群。

集群的空闲时间在以下情况时重置为零:

  • 提交作业(sky.launch()sky.exec())。

  • 集群已重启。

  • 在没有活动设置时设置自动停止。(即,从未设置过自动停止设置,或先前的自动停止设置已取消。)这对于重新启动自动停止计时器很有用。

示例:假设一个未设置任何自动停止的集群已空闲 1 小时,然后设置了 30 分钟的自动停止。集群不会立即自动停止。相反,空闲计时器仅在设置自动停止设置后才开始计数。

当为同一集群指定多个自动停止设置时,最后一个设置优先。

参数:
  • cluster_name (str) – 集群名称。

  • idle_minutes (int) – 集群空闲(没有等待中/运行中作业)多少分钟后将自动停止。设置为负数将取消任何自动停止/自动销毁设置。

  • down (bool) – 如果为 true,则使用自动销毁(销毁集群;不可重启),而不是自动停止(可重启)。

返回类型:

str

返回:

自动停止请求的请求 ID。

请求返回:

None

请求引发异常:
  • sky.exceptions.ClusterDoesNotExist – 如果集群不存在。

  • sky.exceptions.ClusterNotUpError – 如果集群状态不是 UP。

  • sky.exceptions.NotSupportedError – 如果集群不是基于 CloudVmRayBackend 或集群是 TPU VM Pod。

  • sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。

  • sky.exceptions.CloudUserIdentityError – 如果获取当前用户身份失败。

作业 SDK#

集群作业 SDK#

sky.exec#

sky.exec(task, cluster_name=None, dryrun=False, down=False, backend=None)[源代码]

在现有集群上执行任务。

此函数执行两个操作:

  1. workdir 同步(如果任务指定了 workdir);

  2. 执行任务的 run 命令。

所有其他步骤(资源供应、setup 命令、文件挂载同步)都会跳过。如果任务中更改了任何这些规格,此函数不会反映这些更改。要确保集群的 setup 是最新的,请改用 sky.launch()

执行和调度行为

  • 任务将经过作业队列调度,并遵守任何指定的资源要求。它可以在集群中资源充足的任何节点上执行。

  • 任务在 workdir 下运行(如果指定)。

  • 任务以非交互模式运行(没有伪终端或 pty),因此像 htop 这样的交互式命令不起作用。请改用 ssh my_cluster

参数:
  • task (Union[Task, Dag]) – 包含要执行任务的 sky.Task 或 sky.Dag(实验性;仅限单个任务)。

  • cluster_name (Optional[str]) – 要执行任务的现有集群名称。

  • dryrun (bool) – 如果为 True,则不实际执行任务。

  • down (bool) – 所有作业完成(成功或异常)后销毁集群。如果同时设置了 –idle-minutes-to-autostop,集群将在指定的空闲时间后销毁。请注意,如果在资源供应/数据同步/设置过程中发生错误,集群将不会被销毁,以便于调试。

  • backend (Optional[Backend]) – 要使用的后端。如果为 None,则使用默认后端 (CloudVMRayBackend)。

返回类型:

str

返回:

exec 请求的请求 ID。

请求返回:
  • job_id (Optional[int]) – 提交作业的作业 ID。如果后端不是 CloudVmRayBackend 或未向集群提交作业,则为 None。

  • handle (Optional[backends.ResourceHandle]) – 集群句柄。如果为 dryrun,则为 None。

请求引发异常:
  • ValueError – 如果指定的集群不在 UP 状态。

  • sky.exceptions.ClusterDoesNotExist – 如果指定的集群不存在。

  • sky.exceptions.NotSupportedError – 如果指定的集群是控制器,且不支持此操作。

sky.queue#

sky.queue(cluster_name, skip_finished=False, all_users=False)[源代码]

获取集群的作业队列。

参数:
  • cluster_name (str) – 集群名称。

  • skip_finished (bool) – 如果为 True,则跳过已完成的作业。

  • all_users (bool) – 如果为 True,则返回所有用户的作业。

返回类型:

str

返回:

queue 请求的请求 ID。

请求返回:

job_records (List[Dict[str, Any]]) – 队列中每个作业的字典列表。

[
    {
        'job_id': (int) job id,
        'job_name': (str) job name,
        'username': (str) username,
        'user_hash': (str) user hash,
        'submitted_at': (int) timestamp of submitted,
        'start_at': (int) timestamp of started,
        'end_at': (int) timestamp of ended,
        'resources': (str) resources,
        'status': (job_lib.JobStatus) job status,
        'log_path': (str) log path,
    }
]
请求引发异常:
  • sky.exceptions.ClusterDoesNotExist – 如果集群不存在。

  • sky.exceptions.ClusterNotUpError – 如果集群状态不是 UP。

  • sky.exceptions.NotSupportedError – 如果集群不是基于 CloudVmRayBackend

  • sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。

  • sky.exceptions.CloudUserIdentityError – 如果获取当前用户身份失败。

  • sky.exceptions.CommandError – 如果通过 ssh 获取作业队列失败。

sky.job_status#

sky.job_status(cluster_name, job_ids=None)[源代码]

获取集群上作业的状态。

参数:
  • cluster_name (str) – 集群名称。

  • job_ids (Optional[List[int]]) – 作业 ID 列表。如果为 None,则获取最后一个作业的状态。

返回类型:

str

返回:

作业状态请求的请求 ID。

请求返回:

job_statuses (Dict[Optional[int], Optional[job_lib.JobStatus]]) – 作业 ID 到作业状态的映射。如果作业不存在,状态将为 None。如果 job_ids 为 None 且集群上没有作业,将返回 {None: None}。

请求引发异常:
  • sky.exceptions.ClusterDoesNotExist – 如果集群不存在。

  • sky.exceptions.ClusterNotUpError – 如果集群状态不是 UP。

  • sky.exceptions.NotSupportedError – 如果集群不是基于 CloudVmRayBackend

  • sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。

  • sky.exceptions.CloudUserIdentityError – 如果获取当前用户身份失败。

sky.tail_logs#

sky.tail_logs(cluster_name, job_id, follow, tail=0, output_stream=None)[源代码]

跟踪作业日志。

参数:
  • cluster_name (str) – 集群名称。

  • job_id (Optional[int]) – 作业 ID。

  • follow (bool) – 如果为 True,则跟随日志。否则,立即返回日志。

  • tail (int) – 如果 > 0,则显示日志的最后 N 行。

  • output_stream (Optional[TextIOBase]) – 将日志写入的流。如果为 None,则打印到控制台。

返回类型:

int

返回:

根据作业成功或失败的退出码。成功为 0,作业失败为 100。有关可能的退出码,请参阅 exceptions.JobExitCode。

请求引发异常:
  • ValueError – 如果参数无效或不支持该集群。

  • sky.exceptions.ClusterDoesNotExist – 如果集群不存在。

  • sky.exceptions.ClusterNotUpError – 如果集群状态不是 UP。

  • sky.exceptions.NotSupportedError – 如果集群不是基于 CloudVmRayBackend。

  • sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。

  • sky.exceptions.CloudUserIdentityError – 如果获取当前用户身份失败。

sky.download_logs#

sky.download_logs(cluster_name, job_ids)[源代码]

下载作业日志。

参数:
  • cluster_name (str) – (str) 集群名称。

  • job_ids (Optional[List[str]]) – (List[str]) 作业 ID 列表。

返回类型:

Dict[str, str]

返回:

download_logs 请求的请求 ID。

请求返回:

job_log_paths (Dict[str, str]) – 作业 ID 到本地日志路径的映射。

请求引发异常:
  • sky.exceptions.ClusterDoesNotExist – 如果集群不存在。

  • sky.exceptions.ClusterNotUpError – 如果集群状态不是 UP。

  • sky.exceptions.NotSupportedError – 如果集群不是基于 CloudVmRayBackend。

  • sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。

  • sky.exceptions.CloudUserIdentityError – 如果获取当前用户身份失败。

sky.cancel#

sky.cancel(cluster_name, all=False, all_users=False, job_ids=None, _try_cancel_if_cluster_is_init=False)[源代码]

取消集群上的作业。

参数:
  • cluster_name (str) – 集群名称。

  • all (bool) – 如果为 True,则取消所有作业。

  • all_users (bool) – 如果为 True,则取消所有用户的作业。

  • job_ids (Optional[List[int]]) – 要取消的作业 ID 列表。

  • _try_cancel_if_cluster_is_init (bool) – (bool) 是否尝试取消作业,即使集群未处于 UP 状态,但头节点仍然存活。作业控制器使用此标志在 Spot 集群中工作节点被抢占时取消作业。

返回类型:

str

返回:

取消请求的请求 ID。

请求返回:

None

请求引发异常:
  • ValueError – 如果参数无效。

  • sky.exceptions.ClusterDoesNotExist – 如果集群不存在。

  • sky.exceptions.ClusterNotUpError – 如果集群状态不是 UP。

  • sky.exceptions.NotSupportedError – 如果指定的集群是控制器,且不支持此操作。

  • sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。

  • sky.exceptions.CloudUserIdentityError – 如果获取当前用户身份失败。

托管作业 SDK#

sky.jobs.launch#

sky.jobs.launch(task, name=None, _need_confirmation=False)[源代码]

启动托管作业。

请参阅 sky.cli.job_launch 获取文档。

参数:
  • task (Union[Task, Dag]) – 要作为托管作业启动的 sky.Task 或 sky.Dag(实验性;仅限单个任务)。

  • name (Optional[str]) – 托管作业的名称。

  • _need_confirmation (bool) – (仅内部使用) 是否在启动作业前显示确认提示。

返回类型:

str

返回:

启动请求的请求 ID。

请求返回:
  • job_id (Optional[int]) – 托管作业的作业 ID。

  • controller_handle (Optional[ResourceHandle]) – 控制器的 ResourceHandle。

请求引发异常:
  • ValueError – 集群不存在。或者,入口点不是有效的链式 dag。

  • sky.exceptions.NotSupportedError – 不支持该功能。

sky.jobs.queue#

sky.jobs.queue(refresh, skip_finished=False, all_users=False)[源代码]

获取托管作业的状态。

请参阅 sky.cli.job_queue 获取文档。

参数:
  • refresh (bool) – 是否在作业控制器停止时重新启动它。

  • skip_finished (bool) – 是否跳过已完成的作业。

  • all_users (bool) – 是否显示所有用户的作业。

返回类型:

str

返回:

queue 请求的请求 ID。

请求返回:

job_records (List[Dict[str, Any]]) – 一个字典列表,每个字典包含一个作业的信息。

[
  {
    'job_id': (int) job id,
    'job_name': (str) job name,
    'resources': (str) resources of the job,
    'submitted_at': (float) timestamp of submission,
    'end_at': (float) timestamp of end,
    'duration': (float) duration in seconds,
    'recovery_count': (int) Number of retries,
    'status': (sky.jobs.ManagedJobStatus) of the job,
    'cluster_resources': (str) resources of the cluster,
    'region': (str) region of the cluster,
  }
]
请求引发异常:
  • sky.exceptions.ClusterNotUpError – 作业控制器未启动或不存在。

  • RuntimeError – 如果通过 ssh 获取托管作业失败。

sky.jobs.cancel#

sky.jobs.cancel(name=None, job_ids=None, all=False, all_users=False)[源代码]

取消托管作业。

请参阅 sky.cli.job_cancel 获取文档。

参数:
  • name (Optional[str]) – 要取消的托管作业名称。

  • job_ids (Optional[List[int]]) – 要取消的托管作业 ID 列表。

  • all (bool) – 是否取消所有托管作业。

  • all_users (bool) – 是否取消所有用户的托管作业。

返回类型:

str

返回:

取消请求的请求 ID。

请求引发异常:
  • sky.exceptions.ClusterNotUpError – 作业控制器未启动。

  • RuntimeError – 取消作业失败。

sky.jobs.tail_logs#

sky.jobs.tail_logs(name=None, job_id=None, follow=True, controller=False, refresh=False, output_stream=None)[源代码]

跟踪托管作业日志。

您可以提供作业名称或作业 ID 来跟踪日志。如果两者都未提供,将显示最新作业的日志。

参数:
  • name (Optional[str]) – 要跟踪日志的托管作业名称。

  • job_id (Optional[int]) – 要跟踪日志的托管作业 ID。

  • follow (bool) – 是否跟随日志。

  • controller (bool) – 是否跟踪作业控制器的日志。

  • refresh (bool) – 是否在作业控制器停止时重新启动它。

  • output_stream (Optional[TextIOBase]) – 将日志写入的流。如果为 None,则打印到控制台。

返回类型:

int

返回:

根据作业成功或失败的退出码。成功为 0,作业失败为 100。有关可能的退出码,请参阅 exceptions.JobExitCode。

请求引发异常:
  • ValueError – 参数无效。

  • sky.exceptions.ClusterNotUpError – 作业控制器未启动。

服务 SDK#

sky.serve.up#

sky.serve.up(task, service_name, _need_confirmation=False)[源代码]

启动服务。

请参阅 sky.cli.serve_up 获取文档。

参数:
  • task (Union[Task, Dag]) – 要启动服务的 sky.Task。

  • service_name (str) – 服务的名称。

  • _need_confirmation (bool) – (仅内部使用) 是否在启动服务前显示确认提示。

返回类型:

str

返回:

up 请求的请求 ID。

请求返回:
  • service_name (str) – 服务的名称。与作为参数传递的相同。

  • endpoint (str) – 服务端点。

sky.serve.update#

sky.serve.update(task, service_name, mode, _need_confirmation=False)[源代码]

更新现有服务。

请参阅 sky.cli.serve_update 获取文档。

参数:
  • task (Union[Task, Dag]) – 要更新的 sky.Task。

  • service_name (str) – 服务的名称。

  • mode (UpdateMode) – 更新模式,包括:- sky.serve.UpdateMode.ROLLING - sky.serve.UpdateMode.BLUE_GREEN。

  • _need_confirmation (bool) – (仅内部使用) 是否在更新服务前显示确认提示。

返回类型:

str

返回:

update 请求的请求 ID。

请求返回:

None

sky.serve.down#

sky.serve.down(service_names, all=False, purge=False)[源代码]

销毁服务。

请参阅 sky.cli.serve_down 获取文档。

参数:
  • service_names (Union[List[str], str, None]) – 服务的名称。

  • all (bool) – 是否终止所有服务。

  • purge (bool) – 是否终止处于失败状态的服务。这些服务可能导致资源泄露。

返回类型:

str

返回:

销毁请求的请求 ID。

请求返回:

None

请求引发异常:
  • sky.exceptions.ClusterNotUpError – 如果 Sky Serve 控制器未启动。

  • ValueError – 如果参数无效。

  • RuntimeError – 如果终止服务失败。

sky.serve.terminate_replica#

sky.serve.terminate_replica(service_name, replica_id, purge)[源代码]

销毁给定服务的特定副本。

参数:
  • service_name (str) – 服务的名称。

  • replica_id (int) – 要终止副本的 ID。

  • purge (bool) – 是否终止处于失败状态的副本。这些副本可能导致资源泄露,因此我们需要用户明确指定此标志,以确保他们了解这种潜在的资源泄露。

返回类型:

str

返回:

终止副本请求的请求 ID。

请求引发异常:
  • sky.exceptions.ClusterNotUpError – 如果 Sky Serve 控制器未启动。

  • RuntimeError – 如果终止副本失败。

sky.serve.status#

sky.serve.status(service_names)[源代码]

获取服务状态。

如果指定了 service_names,则返回这些服务。否则,返回所有服务。

每个返回值具有以下字段:

{
    'name': (str) service name,
    'active_versions': (List[int]) a list of versions that are active,
    'controller_job_id': (int) the job id of the controller,
    'uptime': (int) uptime in seconds,
    'status': (sky.ServiceStatus) service status,
    'controller_port': (Optional[int]) controller port,
    'load_balancer_port': (Optional[int]) load balancer port,
    'endpoint': (Optional[str]) endpoint of the service,
    'policy': (Optional[str]) autoscaling policy description,
    'requested_resources_str': (str) str representation of
      requested resources,
    'load_balancing_policy': (str) load balancing policy name,
    'replica_info': (List[Dict[str, Any]]) replica information,
}

replica_info 中的每个条目具有以下字段:

{
    'replica_id': (int) replica id,
    'name': (str) replica name,
    'status': (sky.serve.ReplicaStatus) replica status,
    'version': (int) replica version,
    'launched_at': (int) timestamp of launched,
    'handle': (ResourceHandle) handle of the replica cluster,
    'endpoint': (str) endpoint of the replica,
}

有关可能的服务状态和副本状态,请参阅 sky.cli.serve_status。

参数:

service_names (Union[List[str], str, None]) – 要查询的单个服务名称或服务名称列表。如果为 None,则查询所有服务。

返回类型:

str

返回:

状态请求的请求 ID。

请求返回:

service_records (List[Dict[str, Any]]) – 一个字典列表,每个字典包含一个服务的信息。如果未找到服务,将从返回列表中省略。

请求引发异常:
  • RuntimeError – 如果获取服务状态失败。

  • exceptions.ClusterNotUpError – 如果 Sky Serve 控制器未启动。

sky.serve.tail_logs#

sky.serve.tail_logs(service_name, target, replica_id=None, follow=True, output_stream=None)[源代码]

跟踪服务日志。

用法

sky.serve.tail_logs(
    service_name,
    target=<component>,
    follow=False, # Optionally, default to True
    # replica_id=3, # Must be specified when target is REPLICA.
)

target 是一个枚举,类型为 sky.serve.ServiceComponent,可以是以下之一:

  • sky.serve.ServiceComponent.CONTROLLER

  • sky.serve.ServiceComponent.LOAD_BALANCER

  • sky.serve.ServiceComponent.REPLICA

也支持将 target 作为小写字符串传递,例如 target='controller'。要使用 sky.serve.ServiceComponent.REPLICA,您必须指定 replica_id

跟踪控制器日志

# follow default to True
sky.serve.tail_logs(
    service_name, target=sky.serve.ServiceComponent.CONTROLLER
)

打印副本 3 的日志

# Pass target as a lower-case string is also supported.
sky.serve.tail_logs(
    service_name, target='replica',
    follow=False, replica_id=3
)
参数:
  • service_name (str) – 服务的名称。

  • target (Union[str, ServiceComponent]) – 要跟踪日志的组件。

  • replica_id (Optional[int]) – 要跟踪日志的副本 ID。

  • follow (bool) – 是否跟随日志。

  • output_stream (Optional[TextIOBase]) – 将日志写入的流。如果为 None,则打印到控制台。

返回类型:

None

返回:

tail logs 请求的请求 ID。

请求引发异常:
  • sky.exceptions.ClusterNotUpError – Sky Serve 控制器未启动。

  • ValueError – 参数无效,或跟踪日志失败。

Task#

class sky.Task(name=None, *, setup=None, run=None, envs=None, workdir=None, num_nodes=None, docker_image=None, event_callback=None, blocked_resources=None, file_mounts_mapping=None)[源代码]#

Task:要在云上运行的计算任务。

__init__(name=None, *, setup=None, run=None, envs=None, workdir=None, num_nodes=None, docker_image=None, event_callback=None, blocked_resources=None, file_mounts_mapping=None)[source]#

初始化一个 Task。

所有字段都是可选的。Task.run 是实际的程序:它可以是要运行的 shell 命令(字符串),或者是用于不同节点的命令生成器(lambda;见下文)。

可选地,调用 Task.set_resources() 来设置此任务的资源要求。如果未设置,则假定为默认的仅 CPU 要求(与 sky launch 相同)。

此类中的所有设置方法 Task.set_*() 都返回 self,这意味着它们是流畅的 API,可以链式调用。

示例

# A Task that will sync up local workdir '.', containing
# requirements.txt and train.py.
sky.Task(setup='pip install requirements.txt',
         run='python train.py',
         workdir='.')

# An empty Task for provisioning a cluster.
task = sky.Task(num_nodes=n).set_resources(...)

# Chaining setters.
sky.Task().set_resources(...).set_file_mounts(...)
参数:
  • name (Optional[str]) – 用于显示目的的任务的字符串名称。

  • setup (Optional[str]) – 一个 setup 命令,它将在执行 run 运行命令之前运行,并在 workdir 下执行。

  • run (Union[str, Callable[[int, List[str]], Optional[str]], None]) – 任务的实际命令。如果不是 None,则可以是 shell 命令(字符串)或命令生成器(可调用对象)。如果是后者,它必须接受节点排序和节点地址列表作为输入,并返回一个 shell 命令(字符串)(某些节点返回 None 是有效的,在这种情况下不会在这些节点上运行任何命令)。运行命令将在 workdir 下执行。注意,命令生成器应该是一个自包含的 lambda 表达式。

  • envs (Optional[Dict[str, str]]) – 一个字典,包含在运行 setup 和 run 命令之前要设置的环境变量。

  • workdir (Optional[str]) – 本地工作目录。此目录将同步到远程 VM(s) 上的一个位置,并且 setuprun 命令将在该位置下运行(因此,它们在调用二进制文件时可以使用相对路径)。

  • num_nodes (Optional[int]) – 为此任务配置的节点数。如果为 None,则视为 1 个节点。如果 > 1,每个节点将执行其自己的 setup/run 命令,其中 run 可以是一个字符串(表示所有节点获取相同的命令),也可以是一个 lambda(语义如上所述)。

  • docker_image (Optional[str]) – (实验性:仅在使用 LocalDockerBackend 时生效。)此任务将基于的基础 docker 镜像。默认为 'gpuci/miniforge-cuda:11.4-devel-ubuntu18.04'。

  • blocked_resources (Optional[Iterable[Resources]]) – 此任务不能在其上运行的一组资源。

static from_yaml(yaml_path)[source]#

从 task YAML 文件初始化一个任务。

示例

task = sky.Task.from_yaml('/path/to/task.yaml')
参数:

yaml_path (str) – 有效 task yaml 文件的文件路径。

Raises:

ValueError – 如果路径加载后是一个字符串而不是一个字典;或者存在任何其他解析错误。

返回类型:

Task

update_envs(envs)[source]#

更新 setup/run 命令中使用的环境变量。

参数:

envs (Union[None, List[Tuple[str, str]], Dict[str, str]]) – (可选)一个 (env_name, value) 列表或一个 {env_name: value} 字典。

返回类型:

Task

返回:

self – 当前任务,环境变量已更新。

Raises:

ValueError – 如果检测到各种无效输入错误。

set_resources(resources)[source]#

设置执行此任务所需的资源。

如果此函数未为此 Task 调用,则将使用默认资源要求(8 个 vCPU)。

参数:

resources (Union[Resources, List[Resources], Set[Resources]]) – 可以是一个 sky.Resources 对象,一组 Resources 对象,或一个 Resources 对象列表。一组或一个列表资源是要求优化器“选择其中最佳的资源”来运行此任务。

返回类型:

Task

返回:

self – 当前任务,资源已设置。

set_resources_override(override_params)[source]#

设置资源的覆盖参数。

返回类型:

Task

set_service(service)[source]#

为此任务设置服务规范。

参数:

service (Optional[SkyServiceSpec]) – 一个 SkyServiceSpec 对象。

返回类型:

Task

返回:

self – 当前任务,服务已设置。

set_file_mounts(file_mounts)[source]#

为此任务设置文件挂载。

对于同步数据集、点文件等非常有用。

文件挂载是一个字典:{remote_path: local_path/cloud URI}。本地(或云端)文件/目录将同步到此任务将运行的远程 VM(s) 上的指定路径。

源路径和目标路径都不能以斜杠结尾。

示例

task.set_file_mounts({
    '~/.dotfile': '/local/.dotfile',
    # /remote/dir/ will contain the contents of /local/dir/.
    '/remote/dir': '/local/dir',
})
参数:

file_mounts (Optional[Dict[str, str]]) – 一个可选的字典,格式为 {remote_path: local_path/cloud URI},其中 remote 表示此任务最终将在其上运行的 VM(s),local 表示任务启动的节点。

返回类型:

Task

返回:

self – 当前任务,文件挂载已设置。

update_file_mounts(file_mounts)[source]#

更新此任务的文件挂载。

与 set_file_mounts() 不同,此函数是更新到现有的 file_mounts 中(调用 dict.update()),而不是覆盖它。

应在配置之前调用此函数才能生效。

示例

task.update_file_mounts({
    '~/.config': '~/Documents/config',
    '/tmp/workdir': '/local/workdir/cnn-cifar10',
})
参数:

file_mounts (Dict[str, str]) – 一个可选的字典,格式为 {remote_path: local_path/cloud URI},其中 remote 表示此任务最终将在其上运行的 VM(s),local 表示任务启动的节点。

返回类型:

Task

返回:

self – 当前任务,文件挂载已更新。

Raises:

ValueError – 如果输入路径无效。

set_storage_mounts(storage_mounts)[source]#

为此任务设置存储挂载。

存储挂载是一个字典:{mount_path: sky.Storage object},其中每个条目都将一个 sky.Storage 对象(云对象存储桶)挂载到远程集群内的某个路径。

可以通过从本地目录上传(设置 source)来创建 sky.Storage 对象,或者由现有云存储桶支持(将 name 设置为存储桶名称;或将 source 设置为存储桶 URI)。

示例

task.set_storage_mounts({
    '/remote/imagenet/': sky.Storage(name='my-bucket',
                                     source='/local/imagenet'),
})
参数:

storage_mounts (Optional[Dict[str, Storage]]) – 一个可选的字典,格式为 {mount_path: sky.Storage object},其中 mount_path 是 Storage 对象将挂载到的远程 VM(s) 内部的路径。

返回类型:

Task

返回:

self – 当前任务,存储挂载已设置。

Raises:

ValueError – 如果输入路径无效。

update_storage_mounts(storage_mounts)[source]#

更新此任务的存储挂载。

与 set_storage_mounts() 不同,此函数是更新到现有的 storage_mounts 中(调用 dict.update()),而不是覆盖它。

应在配置之前调用此函数才能生效。

参数:

storage_mounts (Dict[str, Storage]) – 一个可选的字典,格式为 {mount_path: sky.Storage object},其中 mount_path 是 Storage 对象将挂载到的远程 VM(s) 内部的路径。

返回类型:

Task

返回:

self – 当前任务,存储挂载已更新。

Raises:

ValueError – 如果输入路径无效。

资源#

class sky.Resources(cloud=None, instance_type=None, cpus=None, memory=None, accelerators=None, accelerator_args=None, use_spot=None, job_recovery=None, region=None, zone=None, image_id=None, disk_size=None, disk_tier=None, ports=None, labels=None, autostop=None, _docker_login_config=None, _docker_username_for_runpod=None, _is_image_managed=None, _requires_fuse=None, _cluster_config_overrides=None)[source]#

资源:任务的计算要求。

此类一旦创建便是不可变的(以确保在属性更改时进行一些验证)。要更新 Resources 实例的属性,请使用 resources.copy(**new_properties)

用于

  • 表示任务/应用的资源请求

  • 作为获取具体可启动实例的“过滤器”

  • 计算计费

  • 在云上进行配置

__init__(cloud=None, instance_type=None, cpus=None, memory=None, accelerators=None, accelerator_args=None, use_spot=None, job_recovery=None, region=None, zone=None, image_id=None, disk_size=None, disk_tier=None, ports=None, labels=None, autostop=None, _docker_login_config=None, _docker_username_for_runpod=None, _is_image_managed=None, _requires_fuse=None, _cluster_config_overrides=None)[source]#

初始化一个 Resources 对象。

所有字段都是可选的。Resources.is_launchable 决定了 Resources 是否完全指定以启动实例。

示例

# Fully specified cloud and instance type (is_launchable() is True).
sky.Resources(clouds.AWS(), 'p3.2xlarge')
sky.Resources(clouds.GCP(), 'n1-standard-16')
sky.Resources(clouds.GCP(), 'n1-standard-8', 'V100')

# Specifying required resources; the system decides the
# cloud/instance type. The below are equivalent:
sky.Resources(accelerators='V100')
sky.Resources(accelerators='V100:1')
sky.Resources(accelerators={'V100': 1})
sky.Resources(cpus='2+', memory='16+', accelerators='V100')
参数:
  • cloud (Optional[Cloud]) – 要使用的云。

  • instance_type (Optional[str]) – 要使用的实例类型。

  • cpus (Union[None, int, float, str]) – 任务所需的 CPU 数量。如果为字符串,则必须是 '2''2+' 形式的字符串,其中 + 表示任务至少需要 2 个 CPU。

  • memory (Union[None, int, float, str]) – 所需内存量,单位为 GiB。如果为字符串,则必须是 '16''16+' 形式的字符串,其中 + 表示任务至少需要 16 GB 内存。

  • accelerators (Union[None, str, Dict[str, Union[int, float]]]) – 所需的加速器。如果为字符串,则必须是 'V100''V100:2' 形式的字符串,其中 :2 表示任务需要 2 个 V100 GPU。如果为字典,则必须是 {'V100': 2}{'tpu-v2-8': 1} 形式的字典。

  • accelerator_args (Optional[Dict[str, str]]) – 加速器特定参数。例如,对于 TPU,为 {'tpu_vm': True, 'runtime_version': 'tpu-vm-base'}

  • use_spot (Optional[bool]) – 是否使用 Spot 实例。如果为 None,则默认为 False。

  • job_recovery (Union[Dict[str, Union[int, str, None]], str, None]) –

    用于托管作业从抢占中恢复集群的作业恢复策略。请参考 recovery_strategy module # pylint: disable=line-too-long 了解更多详情。当提供字典时,它可以包含以下字段

    • strategy: 要使用的恢复策略。

    • max_restarts_on_errors: 用户代码错误时的最大重启次数。

  • region (Optional[str]) – 要使用的区域。

  • zone (Optional[str]) – 要使用的区域。

  • image_id (Union[Dict[Optional[str], str], str, None]) –

    要使用的镜像 ID。如果为字符串,则必须是云提供的镜像 ID 字符串,例如 AWS:'ami-1234567890abcdef0',GCP:'projects/my-project-id/global/images/my-image-name';或者 SkyPilot 提供的镜像标签,例如 AWS:'skypilot:gpu-ubuntu-2004'。如果为字典,则必须是将区域映射到镜像 ID 的字典,例如

    {
      'us-west1': 'ami-1234567890abcdef0',
      'us-east1': 'ami-1234567890abcdef0'
    }
    

  • disk_size (Optional[int]) – 操作系统磁盘的大小,单位为 GiB。

  • disk_tier (Union[str, DiskTier, None]) – 要使用的磁盘性能层级。如果为 None,则默认为 'medium'

  • ports (Union[int, str, List[str], Tuple[str], None]) – 要在实例上打开的端口。

  • labels (Optional[Dict[str, str]]) – 要应用于实例的标签。这些标签对于分配可供外部工具使用的元数据非常有用。实现取决于所选择的云 - 在 AWS 上,标签映射到实例标签 (instance tags)。在 GCP 上,标签映射到实例标签 (instance labels)。在 Kubernetes 上,标签映射到 Pod 标签 (pod labels)。在其他云上,不支持标签并将被忽略。

  • autostop (Union[bool, int, Dict[str, Any], None]) – 要使用的自动停止配置。对于已启动的资源,可能与实际的当前自动停止配置不对应。

  • _docker_login_config (Optional[DockerLoginConfig]) – 要使用的 docker 配置。这包括 docker 用户名、密码和注册表服务器。如果为 None,则跳过 docker 登录。

  • _docker_username_for_runpod (Optional[str]) – docker 容器的登录用户名。RunPod 使用此用户名设置 docker 容器的 ssh 用户。

  • _requires_fuse (Optional[bool]) – 任务是否需要 FUSE 挂载支持。某些云实现内部使用此参数进行 FUSE 挂载的额外设置。此标志还防止在不支持 FUSE 挂载的现有集群上使用 FUSE 挂载。如果为 None,则默认为 False。

Raises:
  • ValueError – 如果某些属性无效。

  • exceptions.NoCloudAccessError – 如果没有启用公共云。

copy(**override)[source]#

返回给定 Resources 的副本。

返回类型:

资源

枚举#

class sky.ClusterStatus(value)[source]#

本地缓存中记录的集群状态。

这可能与实际集群状态不同,可以通过运行 sky status --refresh 来刷新。

INIT = 'INIT'#

正在初始化。

这意味着配置已开始但尚未成功完成。集群可能正在进行 setup,可能 setup 失败,可能处于活动状态或已关闭。

UP = 'UP'#

集群已启动。这意味着之前的配置已成功完成。

STOPPED = 'STOPPED'#

集群已停止。

class sky.JobStatus(value)[source]#

作业状态枚举。

INIT = 'INIT'#

作业已提交,但尚未开始。

PENDING = 'PENDING'#

作业正在等待所需的资源。

SETTING_UP = 'SETTING_UP'#

作业正在运行用户的 setup 脚本。

RUNNING = 'RUNNING'#

作业正在运行。

FAILED_DRIVER = 'FAILED_DRIVER'#

作业驱动程序进程失败。

SUCCEEDED = 'SUCCEEDED'#

作业成功完成。

FAILED = 'FAILED'#

由于用户代码,作业失败。

FAILED_SETUP = 'FAILED_SETUP'#

作业 setup 失败。

CANCELLED = 'CANCELLED'#

作业被用户取消。

class sky.StatusRefreshMode(value)[source]#

刷新集群状态的模式。

NONE = 'NONE'#

不刷新任何集群。

AUTO = 'AUTO'#

仅刷新设置了自动停止或具有 Spot 实例的集群。

FORCE = 'FORCE'#

强制刷新所有集群。

API 服务器 SDK#

sky.get#

sky.get(request_id)[source]

等待并获取请求的结果。

参数:

request_id (str) – 要获取结果的请求 ID。

返回类型:

Any

返回:

指定请求的 Request Returns。有关更多详细信息,请参阅上面特定请求的文档。

Raises:

Exception – 它会引发与特定请求相同的异常,详见上面特定请求文档中的 Request Raises

sky.stream_and_get#

sky.stream_and_get(request_id=None, log_path=None, tail=None, follow=True, output_stream=None)[source]

流式传输请求或日志文件的日志并获取最终结果。

这将阻塞直到请求完成。请求 ID 可以是完整请求 ID 的前缀。

参数:
  • request_id (Optional[str]) – 要流式传输的请求的请求 ID 前缀。

  • log_path (Optional[str]) – 要流式传输的日志文件的路径。

  • tail (Optional[int]) – 显示日志末尾的行数。如果为 None,则显示所有日志。

  • follow (bool) – 是否跟随日志。

  • output_stream (Optional[TextIOBase]) – 要写入的输出流。如果为 None,则打印到控制台。

返回类型:

Any

返回:

指定请求的 Request Returns。有关更多详细信息,请参阅上面特定请求的文档。

Raises:

Exception – 它会引发与特定请求相同的异常,详见上面特定请求文档中的 Request Raises

sky.api_status#

sky.api_status(request_ids=None, all_status=False)[source]

列出所有请求。

参数:
  • request_ids (Optional[List[str]]) – 要查询的请求的请求 ID 前缀。如果为 None,则查询所有请求。

  • all_status (bool) – 是否也列出所有已完成的请求。如果 request_ids 不为 None,则忽略此参数。

返回类型:

List[RequestPayload]

返回:

请求负载列表。

sky.api_cancel#

sky.api_cancel(request_ids=None, all_users=False, silent=False)[source]

中止请求或所有请求。

参数:
  • request_ids (Union[List[str], str, None]) – 要中止的请求 ID。可以是单个字符串或字符串列表。

  • all_users (bool) – 是否中止所有用户的请求。

  • silent (bool) – 是否抑制输出。

返回类型:

str

返回:

中止请求本身的请求 ID。

请求返回:

被取消的请求 ID 列表。

Raises:

click.BadParameter – 如果未指定请求 ID,且未设置 all 或 all_users。

sky.api_info#

sky.api_info()[source]

获取服务器的状态、提交和版本信息。

返回类型:

Dict[str, str]

返回:

包含服务器状态、提交和版本信息的字典。

{
    'status': 'healthy',
    'api_version': '1',
    'commit': 'abc1234567890',
    'version': '1.0.0',
}

sky.api_start#

sky.api_start(*, deploy=False, host='127.0.0.1', foreground=False)[source]

启动 API 服务器。

它检查 API 服务器是否存在,如果不存在则启动它。

参数:
  • deploy (bool) – 是否部署 API 服务器,即充分利用机器资源。

  • host (str) – 部署 API 服务器的主机。如果 deploy 为 True,将设置为 0.0.0.0,以允许远程访问。

  • foreground (bool) – 是否在前台运行 API 服务器(在当前进程中运行)。

返回类型:

None

返回:

None

sky.api_stop#

sky.api_stop()[source]

停止 API 服务器。

如果 API 服务器是远程托管的,此操作将无效。

返回类型:

None

返回:

None

sky.api_server_logs#

sky.api_server_logs(follow=True, tail=None)[source]

流式传输 API 服务器日志。

参数:
  • follow (bool) – 是否跟随日志。

  • tail (Optional[int]) – 从日志末尾显示的行数。如果为 None,则显示所有日志。

返回类型:

None

返回:

None