# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Utility functions for workflows."""
import functools
from collections.abc import Collection as AbcCollection
from collections.abc import Iterable, Sequence
from operator import attrgetter
from typing import Any, TYPE_CHECKING, get_args
from django.db.models import QuerySet
from debusine.artifacts import SourcePackage
from debusine.artifacts.models import (
ArtifactCategory,
ArtifactDataWithArch,
BareDataCategory,
CollectionCategory,
DebianBinaryPackage,
DebianPackageBuildLog,
DebianSourcePackage,
DebianSystemTarball,
DebianUpload,
DebusinePromise,
get_architecture,
get_source_package_name,
)
from debusine.client.models import LookupChildType
from debusine.db.models import (
Artifact,
ArtifactRelation,
CollectionItem,
TaskDatabase,
)
from debusine.server.collections.lookup import (
LookupResult,
lookup_multiple,
lookup_single,
reconstruct_lookup,
)
from debusine.tasks import BaseTask, TaskConfigError, get_environment
from debusine.tasks.models import (
BackendType,
ExtraExternalRepository,
ExtraRepository,
LookupMultiple,
LookupSingle,
)
if TYPE_CHECKING:
from debusine.server.workflows import Workflow
[docs]
@functools.lru_cache(maxsize=100)
def source_package(
workflow: "Workflow[Any, Any]",
*,
configuration_key: str = "source_artifact",
) -> Artifact:
"""
Retrieve the source package artifact from ``configuration_key``.
If the source artifact is a :artifact:`debian:upload`, returns its
:artifact:`debian:source-package`.
"""
artifact = lookup_single(
attrgetter(configuration_key)(workflow.data),
workflow.workspace,
user=workflow.work_request.created_by,
workflow_root=workflow.work_request.workflow_root,
expect_type=LookupChildType.ARTIFACT,
).artifact
return locate_debian_source_package(configuration_key, artifact)
[docs]
@functools.lru_cache(maxsize=100)
def source_package_data(
workflow: "Workflow[Any, Any]",
*,
configuration_key: str = "source_artifact",
) -> DebianSourcePackage:
"""Return source package artifact data for the workflow."""
return SourcePackage.create_data(
source_package(workflow, configuration_key=configuration_key).data
)
[docs]
def lookup_result_artifact_category(result: LookupResult) -> str:
"""
Get artifact category from result of looking up an artifact.
The result may be either an artifact or a promise.
"""
if (
result.result_type == CollectionItem.Types.ARTIFACT
and result.artifact is not None
):
return result.artifact.category
elif (
result.result_type == CollectionItem.Types.BARE
and result.collection_item is not None
and result.collection_item.category == BareDataCategory.PROMISE
):
return DebusinePromise(**result.collection_item.data).promise_category
else:
raise ValueError(
f"Cannot determine artifact category for lookup result: {result}"
)
[docs]
def lookup_result_architecture(result: LookupResult) -> str:
"""Get architecture from result of looking up an artifact."""
if result.artifact is not None:
artifact_data = result.artifact.create_data()
if not isinstance(artifact_data, get_args(ArtifactDataWithArch)):
raise ValueError(f"Unexpected type: {type(artifact_data).__name__}")
return get_architecture(artifact_data)
elif result.collection_item is not None:
architecture = result.collection_item.data.get("architecture")
if type(architecture) is not str:
raise ValueError(
f"Cannot determine architecture for lookup result: {result}"
)
return architecture
else:
raise ValueError(
"Unexpected result: must have collection_item or artifact"
)
[docs]
class ArtifactHasNoBinaryPackageName(Exception):
"""Raised if it's not possible to determine the artifact's binary name."""
[docs]
def lookup_result_binary_package_name(result: LookupResult) -> str:
"""Get binary package name from result of looking up an artifact."""
binary_package_name: str | None
if result.artifact is not None:
artifact_data = result.artifact.create_data()
match artifact_data:
case DebianBinaryPackage():
binary_package_name = artifact_data.deb_fields.get("Package")
case _:
raise ArtifactHasNoBinaryPackageName(f"{type(artifact_data)}")
elif result.collection_item is not None:
binary_package_name = result.collection_item.data.get(
"binary_package_name"
)
else:
raise ValueError(
"Unexpected result: must have collection_item or artifact"
)
if not isinstance(binary_package_name, str):
raise ValueError(
f"Cannot determine binary package name for lookup result: {result}"
)
return binary_package_name
[docs]
def filter_artifact_lookup_by_arch(
workflow: "Workflow[Any, Any]",
lookup: LookupMultiple,
architectures: Iterable[str],
) -> LookupMultiple:
"""Filter an artifact lookup by architecture."""
results = lookup_multiple(
lookup,
workflow.workspace,
user=workflow.work_request.created_by,
workflow_root=workflow.work_request.workflow_root,
expect_type=LookupChildType.ARTIFACT_OR_PROMISE,
)
relevant: list[str] = []
for result in results:
arch_in_lookup = lookup_result_architecture(result)
if arch_in_lookup in architectures:
relevant.append(
reconstruct_lookup(
result, workflow_root=workflow.work_request.workflow_root
)
)
return LookupMultiple.parse_obj(sorted(relevant))
[docs]
def get_architectures(
workflow: "Workflow[Any, Any]", lookup: LookupMultiple
) -> set[str]:
"""
Return set with all the architectures in the artifacts from the lookup.
The architectures are extracted from each lookup result using
:py:func:`lookup_result_architecture`.
"""
results = lookup_multiple(
lookup,
workflow.workspace,
user=workflow.work_request.created_by,
workflow_root=workflow.work_request.workflow_root,
expect_type=LookupChildType.ARTIFACT_OR_PROMISE,
)
return {lookup_result_architecture(result) for result in results}
[docs]
def follow_artifact_relations(
artifact: Artifact,
relation_type: ArtifactRelation.Relations,
category: ArtifactCategory,
) -> QuerySet[Artifact]:
"""Follow relations from artifact to find artifacts of category."""
return Artifact.objects.filter(
targeted_by__artifact=artifact,
targeted_by__type=relation_type,
category=category,
).order_by("id")
[docs]
def locate_debian_source_package(
configuration_key: str, artifact: Artifact
) -> Artifact:
"""
Find a :artifact:`debian:source-package` artifact for a workflow.
:param configuration_key: The key in the workflow's task data from which
the given artifact was looked up.
:param artifact: A :artifact:`debian:source-package` or
:artifact:`debian:upload` artifact.
:return: If ``artifact`` is a
:artifact:`debian:source-package`, return it; if it is a
:artifact:`debian:upload`, return the related
:artifact:`debian:source-package`.
"""
BaseTask.ensure_artifact_categories(
configuration_key=configuration_key,
category=artifact.category,
expected=[ArtifactCategory.SOURCE_PACKAGE, ArtifactCategory.UPLOAD],
)
match artifact.category:
case ArtifactCategory.SOURCE_PACKAGE:
return artifact
case ArtifactCategory.UPLOAD:
source_packages = follow_artifact_relations(
artifact,
ArtifactRelation.Relations.EXTENDS,
ArtifactCategory.SOURCE_PACKAGE,
)
try:
return source_packages.get()
except Artifact.DoesNotExist:
raise TaskConfigError(
f"Unable to find an artifact of category "
f"{ArtifactCategory.SOURCE_PACKAGE} with a relationship "
f"of type {ArtifactRelation.Relations.EXTENDS} from "
f'"{artifact}"'
)
except Artifact.MultipleObjectsReturned:
raise TaskConfigError(
f"Multiple artifacts of category "
f"{ArtifactCategory.SOURCE_PACKAGE} with a relationship "
f"of type {ArtifactRelation.Relations.EXTENDS} from "
f'"{artifact}" found'
)
case _ as unreachable: # pragma: no cover
raise AssertionError(f"Unexpected artifact category: {unreachable}")
[docs]
def locate_debian_source_package_lookup(
workflow: "Workflow[Any, Any]", configuration_key: str, lookup: LookupSingle
) -> LookupSingle:
"""
Return a lookup to a :artifact:`debian:source-package`.
If the specified lookup returns a :artifact:`debian:source-package`,
return it. If it returns a :artifact:`debian:upload`, find the related
:artifact:`debian:source-package` and return a lookup to it.
"""
artifact = lookup_single(
lookup,
workflow.workspace,
user=workflow.work_request.created_by,
workflow_root=workflow.work_request.workflow_root,
expect_type=LookupChildType.ARTIFACT,
).artifact
if artifact.category == ArtifactCategory.UPLOAD:
source_package = locate_debian_source_package(
configuration_key, artifact
)
return f"{source_package.id}@artifacts"
BaseTask.ensure_artifact_categories(
configuration_key=configuration_key,
category=artifact.category,
expected=[ArtifactCategory.SOURCE_PACKAGE],
)
return lookup
[docs]
def locate_debian_binary_packages(
configuration_key: str, artifacts: Sequence[Artifact]
) -> list[Artifact]:
"""
Find :artifact:`debian:binary-package` artifacts for a workflow.
:param configuration_key: The key in the workflow's task data from which
the given artifacts were looked up.
:param artifacts: A sequence of :artifact:`debian:binary-package` or
:artifact:`debian:upload` artifacts.
:return: A list of artifacts: for each element of ``artifacts``, if it
is a :artifact:`debian:binary-package`, return it; if it is a
:artifact:`debian:upload`, return all the related
:artifact:`debian:binary-package` artifacts.
"""
binary_packages: list[Artifact] = []
for i, artifact in enumerate(artifacts):
BaseTask.ensure_artifact_categories(
configuration_key=f"{configuration_key}[{i}]",
category=artifact.category,
expected=[ArtifactCategory.BINARY_PACKAGE, ArtifactCategory.UPLOAD],
)
match artifact.category:
case ArtifactCategory.BINARY_PACKAGE:
binary_packages.append(artifact)
case ArtifactCategory.UPLOAD:
binary_packages += follow_artifact_relations(
artifact,
ArtifactRelation.Relations.EXTENDS,
ArtifactCategory.BINARY_PACKAGE,
)
case _ as unreachable: # pragma: no cover
raise AssertionError(
f"Unexpected artifact category: {unreachable}"
)
return binary_packages
[docs]
def get_source_package_names(
results: Sequence[LookupResult],
*,
configuration_key: str,
artifact_expected_categories: AbcCollection[ArtifactCategory],
) -> list[str]:
"""
Return a sorted list of source package names from results.
It ensures that:
- The :py:class:`LookupResult` objects contain either an artifact or
promise.
- Artifacts belong to the artifact_expected_categories.
- If :py:class:`LookupResult` is a promise: extracts the name from the
promise data ``source_package_name``.
:param results: A sequence of :py:class:`LookupResult` objects
representing artifacts to be processed. Each entry is expected to be
either an artifact or a promise.
:param configuration_key: A string used by
:py:meth:`BaseTask.ensure_artifact_categories` for the exception
message.
:param artifact_expected_categories: valid :py:class:`ArtifactCategory`
that artifacts must belong to.
:return: A sorted list of source package names.
"""
source_package_names = set()
for result in results:
# lookup_multiple expect_type: only artifacts or promises
match result.result_type:
case CollectionItem.Types.ARTIFACT:
assert result.artifact is not None
category = result.artifact.category
BaseTask.ensure_artifact_categories(
configuration_key=configuration_key,
category=category,
expected=artifact_expected_categories,
)
artifact_data = result.artifact.create_data()
assert isinstance(
artifact_data,
(
DebianSourcePackage,
DebianUpload,
DebianBinaryPackage,
DebianPackageBuildLog,
),
)
source_package_names.add(get_source_package_name(artifact_data))
case _:
# Makes coverage happy
# It's a promise.
assert result.result_type == CollectionItem.Types.BARE
assert result.collection_item is not None
BaseTask.ensure_artifact_categories(
configuration_key=configuration_key,
category=result.collection_item.data["promise_category"],
expected=artifact_expected_categories,
)
if (
package_name := result.collection_item.data.get(
"source_package_name"
)
) is not None:
source_package_names.add(package_name)
return sorted(source_package_names)
[docs]
def get_available_architectures(
workflow: "Workflow[Any, Any]", *, vendor: str, codename: str
) -> set[str]:
"""Get architectures available for use with this vendor/codename."""
architectures = set()
for result in lookup_multiple(
LookupMultiple.parse_obj(
{"collection": vendor, "data__codename": codename}
),
workflow.workspace,
user=workflow.work_request.created_by,
default_category=CollectionCategory.ENVIRONMENTS,
expect_type=LookupChildType.ARTIFACT,
):
architectures.add(result.artifact.data.get("architecture"))
if not architectures:
raise TaskConfigError(
f"Unable to find any environments for {vendor}:{codename}"
)
architectures.add("all")
return architectures