Python SDK#
SkyPilot 提供了一个 Python SDK,CLI 在底层使用了它。
大多数 SDK 调用是**异步的并返回一个 future**(request ID
请求 ID)。
等待并获取结果
sky.get(request_id): 等待请求完成,并获取结果或异常。
sky.stream_and_get(request_id): 流式输出请求日志,并获取结果或异常。
管理异步请求
sky.api_status(): 列出所有请求及其状态。
sky.api_cancel(request_id): 取消请求。
有关更多详细信息,请参阅每个 API 的Request Returns
(请求返回)和Request Raises
(请求引发)部分。
集群 SDK#
sky.launch
#
- sky.launch(task, cluster_name=None, retry_until_up=False, idle_minutes_to_autostop=None, dryrun=False, down=False, backend=None, optimize_target=OptimizeTarget.COST, no_setup=False, clone_disk_from=None, fast=False, _need_confirmation=False, _is_launched_by_jobs_controller=False, _is_launched_by_sky_serve_controller=False, _disable_controller_check=False)[源代码]
启动集群或任务。
任务的 setup(设置)和 run(运行)命令在任务的 workdir 下执行(如果指定,它会同步到远程集群)。任务将在集群上进行作业队列调度。
当前,第一个参数必须是 sky.Task,或者(实验性高级用法)sky.Dag。后一种情况目前必须只包含一个单个任务;对 pipeline/通用 DAG 的支持仍在实验性分支中。
示例
import sky task = sky.Task(run='echo hello SkyPilot') task.set_resources( sky.Resources(cloud=sky.AWS(), accelerators='V100:4')) sky.launch(task, cluster_name='my-cluster')
- 参数:
task (
Union
[Task
,Dag
]) – 要启动的 sky.Task 或 sky.Dag(实验性;仅限单个任务)。cluster_name (
Optional
[str
]) – 要创建/重用集群的名称。如果为 None,则自动生成名称。retry_until_up (
bool
) – 是否重试启动集群直到它启动。idle_minutes_to_autostop (
Optional
[int
]) – 集群空闲(即集群作业队列中没有运行或等待中的作业)达到指定分钟数后自动停止。作业队列中发现 setting-up/运行/等待中的作业时,空闲计时器会重置。设置此标志等同于运行sky.launch()
然后运行sky.autostop(idle_minutes=<minutes>)
。如果未设置,集群将不会自动停止。dryrun (
bool
) – 如果为 True,则不实际启动集群。down (
bool
) – 所有作业完成(成功或异常)后销毁集群。如果同时设置了 –idle-minutes-to-autostop,集群将在指定的空闲时间后销毁。请注意,如果在资源供应/数据同步/设置过程中发生错误,集群将不会被销毁,以便于调试。backend (
Optional
[Backend
]) – 要使用的后端。如果为 None,则使用默认后端 (CloudVMRayBackend)。optimize_target (
OptimizeTarget
) – 要优化的目标。选项:OptimizeTarget.COST(成本),OptimizeTarget.TIME(时间)。no_setup (
bool
) – 如果为 True,则不重新运行 setup 命令。clone_disk_from (
Optional
[str
]) – [实验性] 如果设置,则从指定的集群克隆磁盘。这对于将集群迁移到不同的可用区或区域非常有用。fast (
bool
) – [实验性] 如果集群已启动并可用,则跳过资源供应和 setup 步骤。_need_confirmation (
bool
) – (仅内部使用) 如果为 True,则显示确认提示。
- 返回类型:
- 返回:
启动请求的请求 ID。
- 请求返回:
job_id (Optional[int]) – 提交作业的作业 ID。如果后端不是 CloudVmRayBackend 或未向集群提交作业,则为 None。
handle (Optional[backends.ResourceHandle]) – 集群句柄。如果为 dryrun,则为 None。
- 请求引发异常:
exceptions.ClusterOwnerIdentityMismatchError – 如果集群归另一用户所有。
exceptions.InvalidClusterNameError – 如果集群名称无效。
exceptions.ResourcesMismatchError – 如果请求的资源与现有集群不匹配。
exceptions.NotSupportedError – 如果后端/云/集群不支持所需功能。
exceptions.ResourcesUnavailableError – 如果请求的资源无法满足。异常的
failover_history
将被设置为:Empty:当第一次
sky.optimize()
未能找到可行资源时;未尝试预检查或实际启动。Non-empty:当至少发生 1 个异常,可能是来自我们的预检查(例如集群名称无效)或某个区域/可用区抛出资源不可用。
exceptions.CommandError – 任何 ssh 命令错误。
exceptions.NoCloudAccessError – 如果所有云提供商都被禁用。
根据后端可能引发其他异常。
sky.stop
#
- sky.stop(cluster_name, purge=False)[源代码]
停止集群。
停止集群时,附加磁盘上的数据不会丢失。实例的计费会停止,但磁盘仍会收费。重新启动集群时,这些磁盘将重新附加。
目前,Spot 实例集群无法停止(GCP 除外,它允许在停止 Spot VM 时保留磁盘内容)。
- 参数:
- 返回类型:
- 返回:
停止请求的请求 ID。
- 请求返回:
None
- 请求引发异常:
sky.exceptions.ClusterDoesNotExist – 指定的集群不存在。
RuntimeError – 停止集群失败。
sky.exceptions.NotSupportedError – 如果指定的集群是 Spot 集群、TPU VM Pod 集群或托管作业控制器。
sky.start
#
- sky.start(cluster_name, idle_minutes_to_autostop=None, retry_until_up=False, down=False, force=False)[源代码]
重启集群。
如果集群之前已停止(状态为 STOPPED)或在资源供应/运行时安装中失败(状态为 INIT),此函数将尝试启动集群。在后一种情况下,将重试资源供应和运行时安装。
重启已停止的集群时,不使用自动故障转移资源供应。集群将在之前选择的相同云、区域和可用区上启动。
如果集群已处于 UP 状态,此函数无效。
- 参数:
cluster_name (
str
) – 要启动集群的名称。idle_minutes_to_autostop (
Optional
[int
]) – 集群空闲(即集群作业队列中没有运行或等待中的作业)达到指定分钟数后自动停止。作业队列中发现 setting-up/运行/等待中的作业时,空闲计时器会重置。设置此标志等同于运行sky.launch()
然后运行sky.autostop(idle_minutes=<minutes>)
。如果未设置,集群将不会自动停止。retry_until_up (
bool
) – 是否重试启动集群直到它启动。down (
bool
) – 自动销毁集群:所有作业完成(成功或异常)后,在指定的空闲时间后销毁集群。需要设置idle_minutes_to_autostop
。force (
bool
) – 是否强制启动集群,即使它已启动。对升级 SkyPilot 运行时很有用。
- 返回类型:
- 返回:
启动请求的请求 ID。
- 请求返回:
None
- 请求引发异常:
ValueError – 参数值无效:(1) 如果
down
设置为 True 但idle_minutes_to_autostop
为 None;(2) 如果指定的集群是托管作业控制器,并且idle_minutes_to_autostop
不为 None 或down
为 True(省略它们以使用默认的自动停止设置)。sky.exceptions.ClusterDoesNotExist – 指定的集群不存在。
sky.exceptions.NotSupportedError – 如果要重启的集群是使用不支持此操作的非默认后端启动的。
sky.exceptions.ClusterOwnerIdentitiesMismatchError – 如果要重启的集群是由不同用户启动的。
sky.down
#
- sky.down(cluster_name, purge=False)[源代码]
销毁集群。
销毁集群将删除所有相关资源(所有计费停止),并且附加磁盘上的任何数据都将丢失。集群中的加速器(例如 TPU)也将被删除。
- 参数:
- 返回类型:
- 返回:
销毁请求的请求 ID。
- 请求返回:
None
- 请求引发异常:
sky.exceptions.ClusterDoesNotExist – 指定的集群不存在。
RuntimeError – 销毁集群失败。
sky.exceptions.NotSupportedError – 指定的集群是托管作业控制器。
sky.status
#
- sky.status(cluster_names=None, refresh=StatusRefreshMode.NONE, all_users=False)[源代码]
获取集群状态。
如果指定了 cluster_names,则返回这些集群。否则,返回所有集群。
每个集群可以具有以下状态之一:
INIT
: 集群可能正在运行或已关闭。可能发生在以下情况:正在进行的资源供应或运行时设置。(
sky.launch()
已启动但尚未完成。)或者,集群处于异常状态,例如,某些集群节点已关闭,或 SkyPilot 运行时不健康。(要恢复集群,请再次在其上尝试
sky launch
。)
UP
: 资源供应和运行时设置已成功,并且集群正在运行。(最近的sky.launch()
已成功完成。)STOPPED
: 集群已停止且存储已持久化。使用sky.start()
重启集群。
自动停止列
自动停止列指示集群在空闲(没有作业运行)多少分钟后将自动停止。如果
to_down
为 True,集群将自动销毁而不是自动停止。
获取最新的集群状态
在集群完全由 SkyPilot 管理(即没有在云控制台中进行手动操作)且未使用自动停止的正常情况下,此命令返回的表格将准确反映集群状态。
在集群在 SkyPilot 外部发生更改(例如,在云控制台中进行手动操作;未管理的 Spot 集群被抢占)或对于启用了自动停止的集群,请使用
refresh=True
从云提供商查询最新的集群状态。
- 参数:
cluster_names (
Optional
[List
[str
]]) – 要查询的集群名称列表。如果未提供,将查询所有集群。refresh (
StatusRefreshMode
) – 是否从云提供商查询最新的集群状态。all_users (
bool
) – 是否包含所有用户的集群。默认情况下,仅包含当前用户的集群。
- 返回类型:
- 返回:
状态请求的请求 ID。
- 请求返回:
cluster_records (List[Dict[str, Any]]) – 一个字典列表,每个字典包含一个集群的信息。如果发现集群已终止或未找到,将从返回列表中省略。
{ 'name': (str) cluster name, 'launched_at': (int) timestamp of last launch on this cluster, 'handle': (ResourceHandle) an internal handle to the cluster, 'last_use': (str) the last command/entrypoint that affected this cluster, 'status': (sky.ClusterStatus) cluster status, 'autostop': (int) idle time before autostop, 'to_down': (bool) whether autodown is used instead of autostop, 'metadata': (dict) metadata of the cluster, 'user_hash': (str) user hash of the cluster owner, 'user_name': (str) user name of the cluster owner, 'resources_str': (str) the resource string representation of the cluster, }
sky.autostop
#
- sky.autostop(cluster_name, idle_minutes, down=False)[源代码]
为集群调度自动停止/自动销毁。
自动停止/自动销毁将在集群空闲指定时长(集群作业队列中没有进行中(等待中/运行中)的作业)后自动停止或销毁集群。
集群的空闲时间在以下情况时重置为零:
提交作业(
sky.launch()
或sky.exec()
)。集群已重启。
在没有活动设置时设置自动停止。(即,从未设置过自动停止设置,或先前的自动停止设置已取消。)这对于重新启动自动停止计时器很有用。
示例:假设一个未设置任何自动停止的集群已空闲 1 小时,然后设置了 30 分钟的自动停止。集群不会立即自动停止。相反,空闲计时器仅在设置自动停止设置后才开始计数。
当为同一集群指定多个自动停止设置时,最后一个设置优先。
- 参数:
- 返回类型:
- 返回:
自动停止请求的请求 ID。
- 请求返回:
None
- 请求引发异常:
sky.exceptions.ClusterDoesNotExist – 如果集群不存在。
sky.exceptions.ClusterNotUpError – 如果集群状态不是 UP。
sky.exceptions.NotSupportedError – 如果集群不是基于 CloudVmRayBackend 或集群是 TPU VM Pod。
sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。
sky.exceptions.CloudUserIdentityError – 如果获取当前用户身份失败。
作业 SDK#
集群作业 SDK#
sky.exec
#
- sky.exec(task, cluster_name=None, dryrun=False, down=False, backend=None)[源代码]
在现有集群上执行任务。
此函数执行两个操作:
workdir 同步(如果任务指定了 workdir);
执行任务的
run
命令。
所有其他步骤(资源供应、setup 命令、文件挂载同步)都会跳过。如果任务中更改了任何这些规格,此函数不会反映这些更改。要确保集群的 setup 是最新的,请改用
sky.launch()
。执行和调度行为
任务将经过作业队列调度,并遵守任何指定的资源要求。它可以在集群中资源充足的任何节点上执行。
任务在 workdir 下运行(如果指定)。
任务以非交互模式运行(没有伪终端或 pty),因此像
htop
这样的交互式命令不起作用。请改用ssh my_cluster
。
- 参数:
task (
Union
[Task
,Dag
]) – 包含要执行任务的 sky.Task 或 sky.Dag(实验性;仅限单个任务)。dryrun (
bool
) – 如果为 True,则不实际执行任务。down (
bool
) – 所有作业完成(成功或异常)后销毁集群。如果同时设置了 –idle-minutes-to-autostop,集群将在指定的空闲时间后销毁。请注意,如果在资源供应/数据同步/设置过程中发生错误,集群将不会被销毁,以便于调试。backend (
Optional
[Backend
]) – 要使用的后端。如果为 None,则使用默认后端 (CloudVMRayBackend)。
- 返回类型:
- 返回:
exec 请求的请求 ID。
- 请求返回:
job_id (Optional[int]) – 提交作业的作业 ID。如果后端不是 CloudVmRayBackend 或未向集群提交作业,则为 None。
handle (Optional[backends.ResourceHandle]) – 集群句柄。如果为 dryrun,则为 None。
- 请求引发异常:
ValueError – 如果指定的集群不在 UP 状态。
sky.exceptions.ClusterDoesNotExist – 如果指定的集群不存在。
sky.exceptions.NotSupportedError – 如果指定的集群是控制器,且不支持此操作。
sky.queue
#
- sky.queue(cluster_name, skip_finished=False, all_users=False)[源代码]
获取集群的作业队列。
- 参数:
- 返回类型:
- 返回:
queue 请求的请求 ID。
- 请求返回:
job_records (List[Dict[str, Any]]) – 队列中每个作业的字典列表。
[ { 'job_id': (int) job id, 'job_name': (str) job name, 'username': (str) username, 'user_hash': (str) user hash, 'submitted_at': (int) timestamp of submitted, 'start_at': (int) timestamp of started, 'end_at': (int) timestamp of ended, 'resources': (str) resources, 'status': (job_lib.JobStatus) job status, 'log_path': (str) log path, } ]
- 请求引发异常:
sky.exceptions.ClusterDoesNotExist – 如果集群不存在。
sky.exceptions.ClusterNotUpError – 如果集群状态不是 UP。
sky.exceptions.NotSupportedError – 如果集群不是基于
CloudVmRayBackend
。sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。
sky.exceptions.CloudUserIdentityError – 如果获取当前用户身份失败。
sky.exceptions.CommandError – 如果通过 ssh 获取作业队列失败。
sky.job_status
#
- sky.job_status(cluster_name, job_ids=None)[源代码]
获取集群上作业的状态。
- 参数:
- 返回类型:
- 返回:
作业状态请求的请求 ID。
- 请求返回:
job_statuses (Dict[Optional[int], Optional[job_lib.JobStatus]]) – 作业 ID 到作业状态的映射。如果作业不存在,状态将为 None。如果 job_ids 为 None 且集群上没有作业,将返回 {None: None}。
- 请求引发异常:
sky.exceptions.ClusterDoesNotExist – 如果集群不存在。
sky.exceptions.ClusterNotUpError – 如果集群状态不是 UP。
sky.exceptions.NotSupportedError – 如果集群不是基于
CloudVmRayBackend
。sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。
sky.exceptions.CloudUserIdentityError – 如果获取当前用户身份失败。
sky.tail_logs
#
- sky.tail_logs(cluster_name, job_id, follow, tail=0, output_stream=None)[源代码]
跟踪作业日志。
- 参数:
- 返回类型:
- 返回:
根据作业成功或失败的退出码。成功为 0,作业失败为 100。有关可能的退出码,请参阅 exceptions.JobExitCode。
- 请求引发异常:
ValueError – 如果参数无效或不支持该集群。
sky.exceptions.ClusterDoesNotExist – 如果集群不存在。
sky.exceptions.ClusterNotUpError – 如果集群状态不是 UP。
sky.exceptions.NotSupportedError – 如果集群不是基于 CloudVmRayBackend。
sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。
sky.exceptions.CloudUserIdentityError – 如果获取当前用户身份失败。
sky.download_logs
#
- sky.download_logs(cluster_name, job_ids)[源代码]
下载作业日志。
- 参数:
- 返回类型:
- 返回:
download_logs 请求的请求 ID。
- 请求返回:
job_log_paths (Dict[str, str]) – 作业 ID 到本地日志路径的映射。
- 请求引发异常:
sky.exceptions.ClusterDoesNotExist – 如果集群不存在。
sky.exceptions.ClusterNotUpError – 如果集群状态不是 UP。
sky.exceptions.NotSupportedError – 如果集群不是基于 CloudVmRayBackend。
sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。
sky.exceptions.CloudUserIdentityError – 如果获取当前用户身份失败。
sky.cancel
#
- sky.cancel(cluster_name, all=False, all_users=False, job_ids=None, _try_cancel_if_cluster_is_init=False)[源代码]
取消集群上的作业。
- 参数:
- 返回类型:
- 返回:
取消请求的请求 ID。
- 请求返回:
None
- 请求引发异常:
ValueError – 如果参数无效。
sky.exceptions.ClusterDoesNotExist – 如果集群不存在。
sky.exceptions.ClusterNotUpError – 如果集群状态不是 UP。
sky.exceptions.NotSupportedError – 如果指定的集群是控制器,且不支持此操作。
sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。
sky.exceptions.CloudUserIdentityError – 如果获取当前用户身份失败。
托管作业 SDK#
sky.jobs.launch
#
- sky.jobs.launch(task, name=None, _need_confirmation=False)[源代码]
启动托管作业。
请参阅 sky.cli.job_launch 获取文档。
- 参数:
- 返回类型:
- 返回:
启动请求的请求 ID。
- 请求返回:
job_id (Optional[int]) – 托管作业的作业 ID。
controller_handle (Optional[ResourceHandle]) – 控制器的 ResourceHandle。
- 请求引发异常:
ValueError – 集群不存在。或者,入口点不是有效的链式 dag。
sky.exceptions.NotSupportedError – 不支持该功能。
sky.jobs.queue
#
- sky.jobs.queue(refresh, skip_finished=False, all_users=False)[源代码]
获取托管作业的状态。
请参阅 sky.cli.job_queue 获取文档。
- 参数:
- 返回类型:
- 返回:
queue 请求的请求 ID。
- 请求返回:
job_records (List[Dict[str, Any]]) – 一个字典列表,每个字典包含一个作业的信息。
[ { 'job_id': (int) job id, 'job_name': (str) job name, 'resources': (str) resources of the job, 'submitted_at': (float) timestamp of submission, 'end_at': (float) timestamp of end, 'duration': (float) duration in seconds, 'recovery_count': (int) Number of retries, 'status': (sky.jobs.ManagedJobStatus) of the job, 'cluster_resources': (str) resources of the cluster, 'region': (str) region of the cluster, } ]
- 请求引发异常:
sky.exceptions.ClusterNotUpError – 作业控制器未启动或不存在。
RuntimeError – 如果通过 ssh 获取托管作业失败。
sky.jobs.cancel
#
- sky.jobs.cancel(name=None, job_ids=None, all=False, all_users=False)[源代码]
取消托管作业。
请参阅 sky.cli.job_cancel 获取文档。
sky.jobs.tail_logs
#
- sky.jobs.tail_logs(name=None, job_id=None, follow=True, controller=False, refresh=False, output_stream=None)[源代码]
跟踪托管作业日志。
您可以提供作业名称或作业 ID 来跟踪日志。如果两者都未提供,将显示最新作业的日志。
- 参数:
- 返回类型:
- 返回:
根据作业成功或失败的退出码。成功为 0,作业失败为 100。有关可能的退出码,请参阅 exceptions.JobExitCode。
- 请求引发异常:
ValueError – 参数无效。
sky.exceptions.ClusterNotUpError – 作业控制器未启动。
服务 SDK#
sky.serve.up
#
- sky.serve.up(task, service_name, _need_confirmation=False)[源代码]
启动服务。
请参阅 sky.cli.serve_up 获取文档。
sky.serve.update
#
- sky.serve.update(task, service_name, mode, _need_confirmation=False)[源代码]
更新现有服务。
请参阅 sky.cli.serve_update 获取文档。
sky.serve.down
#
- sky.serve.down(service_names, all=False, purge=False)[源代码]
销毁服务。
请参阅 sky.cli.serve_down 获取文档。
sky.serve.terminate_replica
#
- sky.serve.terminate_replica(service_name, replica_id, purge)[源代码]
销毁给定服务的特定副本。
sky.serve.status
#
- sky.serve.status(service_names)[源代码]
获取服务状态。
如果指定了 service_names,则返回这些服务。否则,返回所有服务。
每个返回值具有以下字段:
{ 'name': (str) service name, 'active_versions': (List[int]) a list of versions that are active, 'controller_job_id': (int) the job id of the controller, 'uptime': (int) uptime in seconds, 'status': (sky.ServiceStatus) service status, 'controller_port': (Optional[int]) controller port, 'load_balancer_port': (Optional[int]) load balancer port, 'endpoint': (Optional[str]) endpoint of the service, 'policy': (Optional[str]) autoscaling policy description, 'requested_resources_str': (str) str representation of requested resources, 'load_balancing_policy': (str) load balancing policy name, 'replica_info': (List[Dict[str, Any]]) replica information, }
replica_info 中的每个条目具有以下字段:
{ 'replica_id': (int) replica id, 'name': (str) replica name, 'status': (sky.serve.ReplicaStatus) replica status, 'version': (int) replica version, 'launched_at': (int) timestamp of launched, 'handle': (ResourceHandle) handle of the replica cluster, 'endpoint': (str) endpoint of the replica, }
有关可能的服务状态和副本状态,请参阅 sky.cli.serve_status。
sky.serve.tail_logs
#
- sky.serve.tail_logs(service_name, target, replica_id=None, follow=True, output_stream=None)[源代码]
跟踪服务日志。
用法
sky.serve.tail_logs( service_name, target=<component>, follow=False, # Optionally, default to True # replica_id=3, # Must be specified when target is REPLICA. )
target
是一个枚举,类型为sky.serve.ServiceComponent
,可以是以下之一:sky.serve.ServiceComponent.CONTROLLER
sky.serve.ServiceComponent.LOAD_BALANCER
sky.serve.ServiceComponent.REPLICA
也支持将 target 作为小写字符串传递,例如
target='controller'
。要使用sky.serve.ServiceComponent.REPLICA
,您必须指定replica_id
。跟踪控制器日志
# follow default to True sky.serve.tail_logs( service_name, target=sky.serve.ServiceComponent.CONTROLLER )
打印副本 3 的日志
# Pass target as a lower-case string is also supported. sky.serve.tail_logs( service_name, target='replica', follow=False, replica_id=3 )
- 参数:
- 返回类型:
- 返回:
tail logs 请求的请求 ID。
- 请求引发异常:
sky.exceptions.ClusterNotUpError – Sky Serve 控制器未启动。
ValueError – 参数无效,或跟踪日志失败。
Task#
- class sky.Task(name=None, *, setup=None, run=None, envs=None, workdir=None, num_nodes=None, docker_image=None, event_callback=None, blocked_resources=None, file_mounts_mapping=None)[源代码]#
Task:要在云上运行的计算任务。
- __init__(name=None, *, setup=None, run=None, envs=None, workdir=None, num_nodes=None, docker_image=None, event_callback=None, blocked_resources=None, file_mounts_mapping=None)[source]#
初始化一个 Task。
所有字段都是可选的。
Task.run
是实际的程序:它可以是要运行的 shell 命令(字符串),或者是用于不同节点的命令生成器(lambda;见下文)。可选地,调用
Task.set_resources()
来设置此任务的资源要求。如果未设置,则假定为默认的仅 CPU 要求(与sky launch
相同)。此类中的所有设置方法
Task.set_*()
都返回self
,这意味着它们是流畅的 API,可以链式调用。示例
# A Task that will sync up local workdir '.', containing # requirements.txt and train.py. sky.Task(setup='pip install requirements.txt', run='python train.py', workdir='.') # An empty Task for provisioning a cluster. task = sky.Task(num_nodes=n).set_resources(...) # Chaining setters. sky.Task().set_resources(...).set_file_mounts(...)
- 参数:
setup (
Optional
[str
]) – 一个 setup 命令,它将在执行run
运行命令之前运行,并在workdir
下执行。run (
Union
[str
,Callable
[[int
,List
[str
]],Optional
[str
]],None
]) – 任务的实际命令。如果不是 None,则可以是 shell 命令(字符串)或命令生成器(可调用对象)。如果是后者,它必须接受节点排序和节点地址列表作为输入,并返回一个 shell 命令(字符串)(某些节点返回 None 是有效的,在这种情况下不会在这些节点上运行任何命令)。运行命令将在workdir
下执行。注意,命令生成器应该是一个自包含的 lambda 表达式。envs (
Optional
[Dict
[str
,str
]]) – 一个字典,包含在运行 setup 和 run 命令之前要设置的环境变量。workdir (
Optional
[str
]) – 本地工作目录。此目录将同步到远程 VM(s) 上的一个位置,并且setup
和run
命令将在该位置下运行(因此,它们在调用二进制文件时可以使用相对路径)。num_nodes (
Optional
[int
]) – 为此任务配置的节点数。如果为 None,则视为 1 个节点。如果 > 1,每个节点将执行其自己的 setup/run 命令,其中run
可以是一个字符串(表示所有节点获取相同的命令),也可以是一个 lambda(语义如上所述)。docker_image (
Optional
[str
]) – (实验性:仅在使用 LocalDockerBackend 时生效。)此任务将基于的基础 docker 镜像。默认为 'gpuci/miniforge-cuda:11.4-devel-ubuntu18.04'。blocked_resources (
Optional
[Iterable
[Resources
]]) – 此任务不能在其上运行的一组资源。
- static from_yaml(yaml_path)[source]#
从 task YAML 文件初始化一个任务。
示例
task = sky.Task.from_yaml('/path/to/task.yaml')
- 参数:
yaml_path (
str
) – 有效 task yaml 文件的文件路径。- Raises:
ValueError – 如果路径加载后是一个字符串而不是一个字典;或者存在任何其他解析错误。
- 返回类型:
- set_file_mounts(file_mounts)[source]#
为此任务设置文件挂载。
对于同步数据集、点文件等非常有用。
文件挂载是一个字典:
{remote_path: local_path/cloud URI}
。本地(或云端)文件/目录将同步到此任务将运行的远程 VM(s) 上的指定路径。源路径和目标路径都不能以斜杠结尾。
示例
task.set_file_mounts({ '~/.dotfile': '/local/.dotfile', # /remote/dir/ will contain the contents of /local/dir/. '/remote/dir': '/local/dir', })
- update_file_mounts(file_mounts)[source]#
更新此任务的文件挂载。
与 set_file_mounts() 不同,此函数是更新到现有的 file_mounts 中(调用
dict.update()
),而不是覆盖它。应在配置之前调用此函数才能生效。
示例
task.update_file_mounts({ '~/.config': '~/Documents/config', '/tmp/workdir': '/local/workdir/cnn-cifar10', })
- 参数:
file_mounts (
Dict
[str
,str
]) – 一个可选的字典,格式为{remote_path: local_path/cloud URI}
,其中 remote 表示此任务最终将在其上运行的 VM(s),local 表示任务启动的节点。- 返回类型:
- 返回:
self – 当前任务,文件挂载已更新。
- Raises:
ValueError – 如果输入路径无效。
- set_storage_mounts(storage_mounts)[source]#
为此任务设置存储挂载。
存储挂载是一个字典:
{mount_path: sky.Storage object}
,其中每个条目都将一个 sky.Storage 对象(云对象存储桶)挂载到远程集群内的某个路径。可以通过从本地目录上传(设置
source
)来创建 sky.Storage 对象,或者由现有云存储桶支持(将name
设置为存储桶名称;或将source
设置为存储桶 URI)。示例
task.set_storage_mounts({ '/remote/imagenet/': sky.Storage(name='my-bucket', source='/local/imagenet'), })
- 参数:
storage_mounts (
Optional
[Dict
[str
,Storage
]]) – 一个可选的字典,格式为{mount_path: sky.Storage object}
,其中 mount_path 是 Storage 对象将挂载到的远程 VM(s) 内部的路径。- 返回类型:
- 返回:
self – 当前任务,存储挂载已设置。
- Raises:
ValueError – 如果输入路径无效。
- update_storage_mounts(storage_mounts)[source]#
更新此任务的存储挂载。
与 set_storage_mounts() 不同,此函数是更新到现有的 storage_mounts 中(调用
dict.update()
),而不是覆盖它。应在配置之前调用此函数才能生效。
- 参数:
storage_mounts (
Dict
[str
,Storage
]) – 一个可选的字典,格式为{mount_path: sky.Storage object}
,其中 mount_path 是 Storage 对象将挂载到的远程 VM(s) 内部的路径。- 返回类型:
- 返回:
self – 当前任务,存储挂载已更新。
- Raises:
ValueError – 如果输入路径无效。
资源#
- class sky.Resources(cloud=None, instance_type=None, cpus=None, memory=None, accelerators=None, accelerator_args=None, use_spot=None, job_recovery=None, region=None, zone=None, image_id=None, disk_size=None, disk_tier=None, ports=None, labels=None, autostop=None, _docker_login_config=None, _docker_username_for_runpod=None, _is_image_managed=None, _requires_fuse=None, _cluster_config_overrides=None)[source]#
资源:任务的计算要求。
此类一旦创建便是不可变的(以确保在属性更改时进行一些验证)。要更新 Resources 实例的属性,请使用
resources.copy(**new_properties)
。用于
表示任务/应用的资源请求
作为获取具体可启动实例的“过滤器”
计算计费
在云上进行配置
- __init__(cloud=None, instance_type=None, cpus=None, memory=None, accelerators=None, accelerator_args=None, use_spot=None, job_recovery=None, region=None, zone=None, image_id=None, disk_size=None, disk_tier=None, ports=None, labels=None, autostop=None, _docker_login_config=None, _docker_username_for_runpod=None, _is_image_managed=None, _requires_fuse=None, _cluster_config_overrides=None)[source]#
初始化一个 Resources 对象。
所有字段都是可选的。
Resources.is_launchable
决定了 Resources 是否完全指定以启动实例。示例
# Fully specified cloud and instance type (is_launchable() is True). sky.Resources(clouds.AWS(), 'p3.2xlarge') sky.Resources(clouds.GCP(), 'n1-standard-16') sky.Resources(clouds.GCP(), 'n1-standard-8', 'V100') # Specifying required resources; the system decides the # cloud/instance type. The below are equivalent: sky.Resources(accelerators='V100') sky.Resources(accelerators='V100:1') sky.Resources(accelerators={'V100': 1}) sky.Resources(cpus='2+', memory='16+', accelerators='V100')
- 参数:
cloud (
Optional
[Cloud
]) – 要使用的云。cpus (
Union
[None
,int
,float
,str
]) – 任务所需的 CPU 数量。如果为字符串,则必须是'2'
或'2+'
形式的字符串,其中+
表示任务至少需要 2 个 CPU。memory (
Union
[None
,int
,float
,str
]) – 所需内存量,单位为 GiB。如果为字符串,则必须是'16'
或'16+'
形式的字符串,其中+
表示任务至少需要 16 GB 内存。accelerators (
Union
[None
,str
,Dict
[str
,Union
[int
,float
]]]) – 所需的加速器。如果为字符串,则必须是'V100'
或'V100:2'
形式的字符串,其中:2
表示任务需要 2 个 V100 GPU。如果为字典,则必须是{'V100': 2}
或{'tpu-v2-8': 1}
形式的字典。accelerator_args (
Optional
[Dict
[str
,str
]]) – 加速器特定参数。例如,对于 TPU,为{'tpu_vm': True, 'runtime_version': 'tpu-vm-base'}
。use_spot (
Optional
[bool
]) – 是否使用 Spot 实例。如果为 None,则默认为 False。job_recovery (
Union
[Dict
[str
,Union
[int
,str
,None
]],str
,None
]) –用于托管作业从抢占中恢复集群的作业恢复策略。请参考 recovery_strategy module # pylint: disable=line-too-long 了解更多详情。当提供字典时,它可以包含以下字段
strategy: 要使用的恢复策略。
max_restarts_on_errors: 用户代码错误时的最大重启次数。
image_id (
Union
[Dict
[Optional
[str
],str
],str
,None
]) –要使用的镜像 ID。如果为字符串,则必须是云提供的镜像 ID 字符串,例如 AWS:
'ami-1234567890abcdef0'
,GCP:'projects/my-project-id/global/images/my-image-name'
;或者 SkyPilot 提供的镜像标签,例如 AWS:'skypilot:gpu-ubuntu-2004'
。如果为字典,则必须是将区域映射到镜像 ID 的字典,例如{ 'us-west1': 'ami-1234567890abcdef0', 'us-east1': 'ami-1234567890abcdef0' }
disk_tier (
Union
[str
,DiskTier
,None
]) – 要使用的磁盘性能层级。如果为 None,则默认为'medium'
。ports (
Union
[int
,str
,List
[str
],Tuple
[str
],None
]) – 要在实例上打开的端口。labels (
Optional
[Dict
[str
,str
]]) – 要应用于实例的标签。这些标签对于分配可供外部工具使用的元数据非常有用。实现取决于所选择的云 - 在 AWS 上,标签映射到实例标签 (instance tags)。在 GCP 上,标签映射到实例标签 (instance labels)。在 Kubernetes 上,标签映射到 Pod 标签 (pod labels)。在其他云上,不支持标签并将被忽略。autostop (
Union
[bool
,int
,Dict
[str
,Any
],None
]) – 要使用的自动停止配置。对于已启动的资源,可能与实际的当前自动停止配置不对应。_docker_login_config (
Optional
[DockerLoginConfig
]) – 要使用的 docker 配置。这包括 docker 用户名、密码和注册表服务器。如果为 None,则跳过 docker 登录。_docker_username_for_runpod (
Optional
[str
]) – docker 容器的登录用户名。RunPod 使用此用户名设置 docker 容器的 ssh 用户。_requires_fuse (
Optional
[bool
]) – 任务是否需要 FUSE 挂载支持。某些云实现内部使用此参数进行 FUSE 挂载的额外设置。此标志还防止在不支持 FUSE 挂载的现有集群上使用 FUSE 挂载。如果为 None,则默认为 False。
- Raises:
ValueError – 如果某些属性无效。
exceptions.NoCloudAccessError – 如果没有启用公共云。
枚举#
- class sky.ClusterStatus(value)[source]#
本地缓存中记录的集群状态。
这可能与实际集群状态不同,可以通过运行
sky status --refresh
来刷新。- INIT = 'INIT'#
正在初始化。
这意味着配置已开始但尚未成功完成。集群可能正在进行 setup,可能 setup 失败,可能处于活动状态或已关闭。
- UP = 'UP'#
集群已启动。这意味着之前的配置已成功完成。
- STOPPED = 'STOPPED'#
集群已停止。
- class sky.JobStatus(value)[source]#
作业状态枚举。
- INIT = 'INIT'#
作业已提交,但尚未开始。
- PENDING = 'PENDING'#
作业正在等待所需的资源。
- SETTING_UP = 'SETTING_UP'#
作业正在运行用户的 setup 脚本。
- RUNNING = 'RUNNING'#
作业正在运行。
- FAILED_DRIVER = 'FAILED_DRIVER'#
作业驱动程序进程失败。
- SUCCEEDED = 'SUCCEEDED'#
作业成功完成。
- FAILED = 'FAILED'#
由于用户代码,作业失败。
- FAILED_SETUP = 'FAILED_SETUP'#
作业 setup 失败。
- CANCELLED = 'CANCELLED'#
作业被用户取消。
API 服务器 SDK#
sky.get
#
sky.stream_and_get
#
sky.api_status
#
sky.api_cancel
#
- sky.api_cancel(request_ids=None, all_users=False, silent=False)[source]
中止请求或所有请求。