管理员策略实施#
SkyPilot 提供了一种管理员策略机制,管理员可以使用它来强制用户遵守某些 SkyPilot 使用策略。管理员策略对用户的任务和 SkyPilot 配置应用自定义验证和修改逻辑。
示例用法
如何实现和使用管理员策略
管理员编写一个简单的 Python 包,其中包含一个实现 SkyPilot 的
sky.AdminPolicy
接口的策略类;管理员将此包分发给用户;
用户只需在 SkyPilot 配置文件
~/.sky/config.yaml
中设置admin_policy
字段,策略即可生效。
概述#
用户侧#
要应用此策略,用户需要在 SkyPilot 配置文件 ~/.sky/config.yaml
中将 admin_policy
字段设置为实现该策略的 Python 包的路径。例如:
admin_policy: mypackage.subpackage.MyPolicy
提示
SkyPilot 会在相同的 Python 环境中从给定包加载策略。您可以通过运行以下命令测试策略是否存在:
python -c "from mypackage.subpackage import MyPolicy"
管理员侧#
管理员可以将包含预定义策略的 Python 包分发给用户。策略应实现 sky.AdminPolicy
接口
class AdminPolicy:
"""Abstract interface of an admin-defined policy for all user requests.
Admins can implement a subclass of AdminPolicy with the following signature:
import sky
class SkyPilotPolicyV1(sky.AdminPolicy):
def validate_and_mutate(user_request: UserRequest) -> MutatedUserRequest:
...
return MutatedUserRequest(task=..., skypilot_config=...)
The policy can mutate both task and skypilot_config. Admins then distribute
a simple module that contains this implementation, installable in a way
that it can be imported by users from the same Python environment where
SkyPilot is running.
Users can register a subclass of AdminPolicy in the SkyPilot config file
under the key 'admin_policy', e.g.
admin_policy: my_package.SkyPilotPolicyV1
"""
@classmethod
@abc.abstractmethod
def validate_and_mutate(cls,
user_request: UserRequest) -> MutatedUserRequest:
"""Validates and mutates the user request and returns mutated request.
Args:
user_request: The user request to validate and mutate.
UserRequest contains (sky.Task, sky.Config)
Returns:
MutatedUserRequest: The mutated user request.
Raises:
Exception to throw if the user request failed the validation.
"""
raise NotImplementedError(
'Your policy must implement validate_and_mutate')
您的自定义管理员策略应如下所示:
import sky
class MyPolicy(sky.AdminPolicy):
@classmethod
def validate_and_mutate(cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
# Logic for validate and modify user requests.
...
return sky.MutatedUserRequest(user_request.task,
user_request.skypilot_config)
UserRequest
和 MutatedUserRequest
定义如下(详见源代码):
@dataclasses.dataclass
class UserRequest:
"""A user request.
A "user request" is defined as a `sky launch / exec` command or its API
equivalent.
`sky jobs launch / serve up` involves multiple launch requests, including
the launch of controller and clusters for a job (which can have multiple
tasks if it is a pipeline) or service replicas. Each launch is a separate
request.
This class wraps the underlying task, the global skypilot config used to run
a task, and the request options.
Args:
task: User specified task.
skypilot_config: Global skypilot config to be used in this request.
request_options: Request options. It is None for jobs and services.
"""
task: 'sky.Task'
skypilot_config: 'sky.Config'
request_options: Optional['RequestOptions'] = None
@dataclasses.dataclass
class MutatedUserRequest:
task: 'sky.Task'
skypilot_config: 'sky.Config'
换句话说,AdminPolicy
可以修改用户请求的任何字段,包括任务和全局 SkyPilot 配置,这赋予管理员很大的灵活性来控制用户的 SkyPilot 使用方式。
AdminPolicy
可用于验证和修改用户请求。如果请求应该被拒绝,策略应抛出异常。
sky.Config
和 sky.RequestOptions
类定义如下:
class Config(Dict[str, Any]):
"""SkyPilot config that supports setting/getting values with nested keys."""
def get_nested(
self,
keys: Tuple[str, ...],
default_value: Any,
override_configs: Optional[Dict[str, Any]] = None,
allowed_override_keys: Optional[List[Tuple[str, ...]]] = None,
disallowed_override_keys: Optional[List[Tuple[str,
...]]] = None) -> Any:
"""Gets a nested key.
If any key is not found, or any intermediate key does not point to a
dict value, returns 'default_value'.
Args:
keys: A tuple of strings representing the nested keys.
default_value: The default value to return if the key is not found.
override_configs: A dict of override configs with the same schema as
the config file, but only containing the keys to override.
allowed_override_keys: A list of keys that are allowed to be
overridden.
disallowed_override_keys: A list of keys that are disallowed to be
overridden.
Returns:
The value of the nested key, or 'default_value' if not found.
"""
config = copy.deepcopy(self)
if override_configs is not None:
config = _recursive_update(config, override_configs,
allowed_override_keys,
disallowed_override_keys)
return _get_nested(config, keys, default_value, pop=False)
def set_nested(self, keys: Tuple[str, ...], value: Any) -> None:
"""In-place sets a nested key to value.
Like get_nested(), if any key is not found, this will not raise an
error.
"""
override = {}
for i, key in enumerate(reversed(keys)):
if i == 0:
override = {key: value}
else:
override = {key: override}
_recursive_update(self, override)
def pop_nested(self, keys: Tuple[str, ...], default_value: Any) -> Any:
"""Pops a nested key."""
return _get_nested(self, keys, default_value, pop=True)
@classmethod
def from_dict(cls, config: Optional[Dict[str, Any]]) -> 'Config':
if config is None:
return cls()
return cls(**config)
@dataclasses.dataclass
class RequestOptions:
"""Request options for admin policy.
Args:
cluster_name: Name of the cluster to create/reuse. It is None if not
specified by the user.
idle_minutes_to_autostop: Autostop setting requested by a user. The
cluster will be set to autostop after this many minutes of idleness.
down: If true, use autodown rather than autostop.
dryrun: Is the request a dryrun?
"""
cluster_name: Optional[str]
idle_minutes_to_autostop: Optional[int]
down: bool
dryrun: bool
注意
sky.AdminPolicy
应该是幂等的。换句话说,将策略多次应用于同一个用户请求应该是安全的。
示例策略#
我们在 examples/admin_policy/example_policy 中提供了一些示例策略。您可以通过在您的 Python 环境中安装示例策略包来测试这些策略。
git clone https://github.com/skypilot-org/skypilot.git
cd skypilot
pip install examples/admin_policy/example_policy
拒绝所有任务#
class RejectAllPolicy(sky.AdminPolicy):
"""Example policy: rejects all user requests."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Rejects all user requests."""
raise RuntimeError('Reject all policy')
admin_policy: example_policy.RejectAllPolicy
为 Kubernetes 上的所有任务添加标签#
class AddLabelsPolicy(sky.AdminPolicy):
"""Example policy: adds a kubernetes label for skypilot_config."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
config = user_request.skypilot_config
labels = config.get_nested(('kubernetes', 'custom_metadata', 'labels'),
{})
labels['app'] = 'skypilot'
config.set_nested(('kubernetes', 'custom_metadata', 'labels'), labels)
return sky.MutatedUserRequest(user_request.task, config)
admin_policy: example_policy.AddLabelsPolicy
始终禁用 AWS 任务的公共 IP#
class DisablePublicIpPolicy(sky.AdminPolicy):
"""Example policy: disables public IP for all AWS tasks."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
config = user_request.skypilot_config
config.set_nested(('aws', 'use_internal_ip'), True)
if config.get_nested(('aws', 'vpc_name'), None) is None:
# If no VPC name is specified, it is likely a mistake. We should
# reject the request
raise RuntimeError('VPC name should be set. Check organization '
'wiki for more information.')
return sky.MutatedUserRequest(user_request.task, config)
admin_policy: example_policy.DisablePublicIpPolicy
所有 GPU 任务都使用 Spot 实例#
class UseSpotForGpuPolicy(sky.AdminPolicy):
"""Example policy: use spot instances for all GPU tasks."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Sets use_spot to True for all GPU tasks."""
task = user_request.task
new_resources = []
for r in task.resources:
if r.accelerators:
new_resources.append(r.copy(use_spot=True))
else:
new_resources.append(r)
task.set_resources(type(task.resources)(new_resources))
return sky.MutatedUserRequest(
task=task, skypilot_config=user_request.skypilot_config)
admin_policy: example_policy.UseSpotForGpuPolicy
对所有任务强制执行自动停止#
class EnforceAutostopPolicy(sky.AdminPolicy):
"""Example policy: enforce autostop for all tasks."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Enforces autostop for all tasks.
Note that with this policy enforced, users can still change the autostop
setting for an existing cluster by using `sky autostop`.
Since we refresh the cluster status with `sky.status` whenever this
policy is applied, we should expect a few seconds latency when a user
run a request.
"""
request_options = user_request.request_options
# Request options is None when a task is executed with `jobs launch` or
# `sky serve up`.
if request_options is None:
return sky.MutatedUserRequest(
task=user_request.task,
skypilot_config=user_request.skypilot_config)
# Get the cluster record to operate on.
cluster_name = request_options.cluster_name
cluster_records = []
if cluster_name is not None:
cluster_records = sky.status(cluster_name,
refresh=True,
all_users=True)
# Check if the user request should specify autostop settings.
need_autostop = False
if not cluster_records:
# Cluster does not exist
need_autostop = True
elif cluster_records[0]['status'] == sky.ClusterStatus.STOPPED:
# Cluster is stopped
need_autostop = True
elif cluster_records[0]['autostop'] < 0:
# Cluster is running but autostop is not set
need_autostop = True
# Check if the user request is setting autostop settings.
is_setting_autostop = False
idle_minutes_to_autostop = request_options.idle_minutes_to_autostop
is_setting_autostop = (idle_minutes_to_autostop is not None and
idle_minutes_to_autostop >= 0)
# If the cluster requires autostop but the user request is not setting
# autostop settings, raise an error.
if need_autostop and not is_setting_autostop:
raise RuntimeError('Autostop/down must be set for all clusters.')
return sky.MutatedUserRequest(
task=user_request.task,
skypilot_config=user_request.skypilot_config)
admin_policy: example_policy.EnforceAutostopPolicy
动态更新要使用的 Kubernetes 上下文#
class DynamicKubernetesContextsUpdatePolicy(sky.AdminPolicy):
"""Example policy: update the kubernetes context to use."""
@classmethod
def validate_and_mutate(
cls, user_request: sky.UserRequest) -> sky.MutatedUserRequest:
"""Updates the kubernetes context to use."""
# Append any new kubernetes clusters in local kubeconfig. An example
# implementation of this method can be:
# 1. Query an organization's internal Kubernetes cluster registry,
# which can be some internal API, or a secret vault.
# 2. Append the new credentials to the local kubeconfig.
update_current_kubernetes_clusters_from_registry()
# Get the allowed contexts for the user. Similarly, it can retrieve
# the latest allowed contexts from an organization's internal API.
allowed_contexts = get_allowed_contexts()
# Update the kubernetes allowed contexts in skypilot config.
config = user_request.skypilot_config
config.set_nested(('kubernetes', 'allowed_contexts'), allowed_contexts)
return sky.MutatedUserRequest(task=user_request.task,
skypilot_config=config)
admin_policy: example_policy.DynamicKubernetesContextsUpdatePolicy