Refactor package_outputs in the TestOptimizer so it just returns a list of soong_zip commands to be run by build_test_suites. Since we already have a tested implementation for running subprocesses in build_test_suites.py there's no reason to reimplement it in optimized_targets.py. Because any packaging will ultimately use soong_zip to package its final outputs change the code to just do whatever prep it needs to and return a list of soong_zip commands. This way the code is simpler to test without requiring subprocesses and no reimplementation of subprocess running code is necessary. Test: atest build_test_suites_test; atest optimized_targets_test Bug: 358215235 Change-Id: I3025aefeeb7186f537266a72d8422211ca9835ba
519 lines
16 KiB
Python
519 lines
16 KiB
Python
# Copyright 2024, The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Tests for build_test_suites.py"""
|
|
|
|
import argparse
|
|
import functools
|
|
from importlib import resources
|
|
import json
|
|
import multiprocessing
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import signal
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import textwrap
|
|
import time
|
|
from typing import Callable
|
|
import unittest
|
|
from unittest import mock
|
|
from build_context import BuildContext
|
|
import build_test_suites
|
|
import ci_test_lib
|
|
import optimized_targets
|
|
from pyfakefs import fake_filesystem_unittest
|
|
|
|
|
|
class BuildTestSuitesTest(fake_filesystem_unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.setUpPyfakefs()
|
|
|
|
os_environ_patcher = mock.patch.dict('os.environ', {})
|
|
self.addCleanup(os_environ_patcher.stop)
|
|
self.mock_os_environ = os_environ_patcher.start()
|
|
|
|
subprocess_run_patcher = mock.patch('subprocess.run')
|
|
self.addCleanup(subprocess_run_patcher.stop)
|
|
self.mock_subprocess_run = subprocess_run_patcher.start()
|
|
|
|
self._setup_working_build_env()
|
|
|
|
def test_missing_target_release_env_var_raises(self):
|
|
del os.environ['TARGET_RELEASE']
|
|
|
|
with self.assert_raises_word(build_test_suites.Error, 'TARGET_RELEASE'):
|
|
build_test_suites.main([])
|
|
|
|
def test_missing_target_product_env_var_raises(self):
|
|
del os.environ['TARGET_PRODUCT']
|
|
|
|
with self.assert_raises_word(build_test_suites.Error, 'TARGET_PRODUCT'):
|
|
build_test_suites.main([])
|
|
|
|
def test_missing_top_env_var_raises(self):
|
|
del os.environ['TOP']
|
|
|
|
with self.assert_raises_word(build_test_suites.Error, 'TOP'):
|
|
build_test_suites.main([])
|
|
|
|
def test_invalid_arg_raises(self):
|
|
invalid_args = ['--invalid_arg']
|
|
|
|
with self.assertRaisesRegex(SystemExit, '2'):
|
|
build_test_suites.main(invalid_args)
|
|
|
|
def test_build_failure_returns(self):
|
|
self.mock_subprocess_run.side_effect = subprocess.CalledProcessError(
|
|
42, None
|
|
)
|
|
|
|
with self.assertRaisesRegex(SystemExit, '42'):
|
|
build_test_suites.main([])
|
|
|
|
def test_incorrectly_formatted_build_context_raises(self):
|
|
build_context = self.fake_top.joinpath('build_context')
|
|
build_context.touch()
|
|
os.environ['BUILD_CONTEXT'] = str(build_context)
|
|
|
|
with self.assert_raises_word(build_test_suites.Error, 'JSON'):
|
|
build_test_suites.main([])
|
|
|
|
def test_build_success_returns(self):
|
|
with self.assertRaisesRegex(SystemExit, '0'):
|
|
build_test_suites.main([])
|
|
|
|
def assert_raises_word(self, cls, word):
|
|
return self.assertRaisesRegex(cls, rf'\b{word}\b')
|
|
|
|
def _setup_working_build_env(self):
|
|
self.fake_top = pathlib.Path('/fake/top')
|
|
self.fake_top.mkdir(parents=True)
|
|
|
|
self.soong_ui_dir = self.fake_top.joinpath('build/soong')
|
|
self.soong_ui_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self.soong_ui = self.soong_ui_dir.joinpath('soong_ui.bash')
|
|
self.soong_ui.touch()
|
|
|
|
self.mock_os_environ.update({
|
|
'TARGET_RELEASE': 'release',
|
|
'TARGET_PRODUCT': 'product',
|
|
'TOP': str(self.fake_top),
|
|
})
|
|
|
|
self.mock_subprocess_run.return_value = 0
|
|
|
|
|
|
class RunCommandIntegrationTest(ci_test_lib.TestCase):
|
|
|
|
def setUp(self):
|
|
self.temp_dir = ci_test_lib.TestTemporaryDirectory.create(self)
|
|
|
|
# Copy the Python executable from 'non-code' resources and make it
|
|
# executable for use by tests that launch a subprocess. Note that we don't
|
|
# use Python's native `sys.executable` property since that is not set when
|
|
# running via the embedded launcher.
|
|
base_name = 'py3-cmd'
|
|
dest_file = self.temp_dir.joinpath(base_name)
|
|
with resources.as_file(
|
|
resources.files('testdata').joinpath(base_name)
|
|
) as p:
|
|
shutil.copy(p, dest_file)
|
|
dest_file.chmod(dest_file.stat().st_mode | stat.S_IEXEC)
|
|
self.python_executable = dest_file
|
|
|
|
self._managed_processes = []
|
|
|
|
def tearDown(self):
|
|
self._terminate_managed_processes()
|
|
|
|
def test_raises_on_nonzero_exit(self):
|
|
with self.assertRaises(Exception):
|
|
build_test_suites.run_command([
|
|
self.python_executable,
|
|
'-c',
|
|
textwrap.dedent(f"""\
|
|
import sys
|
|
sys.exit(1)
|
|
"""),
|
|
])
|
|
|
|
def test_streams_stdout(self):
|
|
|
|
def run_slow_command(stdout_file, marker):
|
|
with open(stdout_file, 'w') as f:
|
|
build_test_suites.run_command(
|
|
[
|
|
self.python_executable,
|
|
'-c',
|
|
textwrap.dedent(f"""\
|
|
import time
|
|
|
|
print('{marker}', end='', flush=True)
|
|
|
|
# Keep process alive until we check stdout.
|
|
time.sleep(10)
|
|
"""),
|
|
],
|
|
stdout=f,
|
|
)
|
|
|
|
marker = 'Spinach'
|
|
stdout_file = self.temp_dir.joinpath('stdout.txt')
|
|
|
|
p = self.start_process(target=run_slow_command, args=[stdout_file, marker])
|
|
|
|
self.assert_file_eventually_contains(stdout_file, marker)
|
|
|
|
def test_propagates_interruptions(self):
|
|
|
|
def run(pid_file):
|
|
build_test_suites.run_command([
|
|
self.python_executable,
|
|
'-c',
|
|
textwrap.dedent(f"""\
|
|
import os
|
|
import pathlib
|
|
import time
|
|
|
|
pathlib.Path('{pid_file}').write_text(str(os.getpid()))
|
|
|
|
# Keep the process alive for us to explicitly interrupt it.
|
|
time.sleep(10)
|
|
"""),
|
|
])
|
|
|
|
pid_file = self.temp_dir.joinpath('pid.txt')
|
|
p = self.start_process(target=run, args=[pid_file])
|
|
subprocess_pid = int(read_eventual_file_contents(pid_file))
|
|
|
|
os.kill(p.pid, signal.SIGINT)
|
|
p.join()
|
|
|
|
self.assert_process_eventually_dies(p.pid)
|
|
self.assert_process_eventually_dies(subprocess_pid)
|
|
|
|
def start_process(self, *args, **kwargs) -> multiprocessing.Process:
|
|
p = multiprocessing.Process(*args, **kwargs)
|
|
self._managed_processes.append(p)
|
|
p.start()
|
|
return p
|
|
|
|
def assert_process_eventually_dies(self, pid: int):
|
|
try:
|
|
wait_until(lambda: not ci_test_lib.process_alive(pid))
|
|
except TimeoutError as e:
|
|
self.fail(f'Process {pid} did not die after a while: {e}')
|
|
|
|
def assert_file_eventually_contains(self, file: pathlib.Path, substring: str):
|
|
wait_until(lambda: file.is_file() and file.stat().st_size > 0)
|
|
self.assertIn(substring, read_file_contents(file))
|
|
|
|
def _terminate_managed_processes(self):
|
|
for p in self._managed_processes:
|
|
if not p.is_alive():
|
|
continue
|
|
|
|
# We terminate the process with `SIGINT` since using `terminate` or
|
|
# `SIGKILL` doesn't kill any grandchild processes and we don't have
|
|
# `psutil` available to easily query all children.
|
|
os.kill(p.pid, signal.SIGINT)
|
|
|
|
|
|
class BuildPlannerTest(unittest.TestCase):
|
|
|
|
class TestOptimizedBuildTarget(optimized_targets.OptimizedBuildTarget):
|
|
|
|
def __init__(
|
|
self, target, build_context, args, output_targets, packaging_commands
|
|
):
|
|
super().__init__(target, build_context, args)
|
|
self.output_targets = output_targets
|
|
self.packaging_commands = packaging_commands
|
|
|
|
def get_build_targets_impl(self):
|
|
return self.output_targets
|
|
|
|
def get_package_outputs_commands_impl(self):
|
|
return self.packaging_commands
|
|
|
|
def get_enabled_flag(self):
|
|
return f'{self.target}_enabled'
|
|
|
|
def test_build_optimization_off_builds_everything(self):
|
|
build_targets = {'target_1', 'target_2'}
|
|
build_planner = self.create_build_planner(
|
|
build_context=self.create_build_context(optimized_build_enabled=False),
|
|
build_targets=build_targets,
|
|
)
|
|
|
|
build_plan = build_planner.create_build_plan()
|
|
|
|
self.assertSetEqual(build_targets, build_plan.build_targets)
|
|
|
|
def test_build_optimization_off_doesnt_package(self):
|
|
build_targets = {'target_1', 'target_2'}
|
|
build_planner = self.create_build_planner(
|
|
build_context=self.create_build_context(optimized_build_enabled=False),
|
|
build_targets=build_targets,
|
|
)
|
|
|
|
build_plan = build_planner.create_build_plan()
|
|
|
|
self.assertEqual(len(build_plan.packaging_commands), 0)
|
|
|
|
def test_build_optimization_on_optimizes_target(self):
|
|
build_targets = {'target_1', 'target_2'}
|
|
build_planner = self.create_build_planner(
|
|
build_targets=build_targets,
|
|
build_context=self.create_build_context(
|
|
enabled_build_features=[{'name': self.get_target_flag('target_1')}]
|
|
),
|
|
)
|
|
|
|
build_plan = build_planner.create_build_plan()
|
|
|
|
expected_targets = {self.get_optimized_target_name('target_1'), 'target_2'}
|
|
self.assertSetEqual(expected_targets, build_plan.build_targets)
|
|
|
|
def test_build_optimization_on_packages_target(self):
|
|
build_targets = {'target_1', 'target_2'}
|
|
optimized_target_name = self.get_optimized_target_name('target_1')
|
|
packaging_commands = [[f'packaging {optimized_target_name}']]
|
|
build_planner = self.create_build_planner(
|
|
build_targets=build_targets,
|
|
build_context=self.create_build_context(
|
|
enabled_build_features=[{'name': self.get_target_flag('target_1')}]
|
|
),
|
|
packaging_commands=packaging_commands,
|
|
)
|
|
|
|
build_plan = build_planner.create_build_plan()
|
|
|
|
self.assertIn([f'packaging {optimized_target_name}'], build_plan.packaging_commands)
|
|
|
|
def test_individual_build_optimization_off_doesnt_optimize(self):
|
|
build_targets = {'target_1', 'target_2'}
|
|
build_planner = self.create_build_planner(
|
|
build_targets=build_targets,
|
|
)
|
|
|
|
build_plan = build_planner.create_build_plan()
|
|
|
|
self.assertSetEqual(build_targets, build_plan.build_targets)
|
|
|
|
def test_individual_build_optimization_off_doesnt_package(self):
|
|
build_targets = {'target_1', 'target_2'}
|
|
packaging_commands = [['packaging command']]
|
|
build_planner = self.create_build_planner(
|
|
build_targets=build_targets,
|
|
packaging_commands=packaging_commands,
|
|
)
|
|
|
|
build_plan = build_planner.create_build_plan()
|
|
|
|
self.assertFalse(build_plan.packaging_commands)
|
|
|
|
def test_target_output_used_target_built(self):
|
|
build_target = 'test_target'
|
|
build_planner = self.create_build_planner(
|
|
build_targets={build_target},
|
|
build_context=self.create_build_context(
|
|
test_context=self.get_test_context(build_target),
|
|
enabled_build_features=[{'name': 'test_target_unused_exclusion'}],
|
|
),
|
|
)
|
|
|
|
build_plan = build_planner.create_build_plan()
|
|
|
|
self.assertSetEqual(build_plan.build_targets, {build_target})
|
|
|
|
def test_target_regex_used_target_built(self):
|
|
build_target = 'test_target'
|
|
test_context = self.get_test_context(build_target)
|
|
test_context['testInfos'][0]['extraOptions'] = [{
|
|
'key': 'additional-files-filter',
|
|
'values': [f'.*{build_target}.*\.zip'],
|
|
}]
|
|
build_planner = self.create_build_planner(
|
|
build_targets={build_target},
|
|
build_context=self.create_build_context(
|
|
test_context=test_context,
|
|
enabled_build_features=[{'name': 'test_target_unused_exclusion'}],
|
|
),
|
|
)
|
|
|
|
build_plan = build_planner.create_build_plan()
|
|
|
|
self.assertSetEqual(build_plan.build_targets, {build_target})
|
|
|
|
def test_target_output_not_used_target_not_built(self):
|
|
build_target = 'test_target'
|
|
test_context = self.get_test_context(build_target)
|
|
test_context['testInfos'][0]['extraOptions'] = []
|
|
build_planner = self.create_build_planner(
|
|
build_targets={build_target},
|
|
build_context=self.create_build_context(
|
|
test_context=test_context,
|
|
enabled_build_features=[{'name': 'test_target_unused_exclusion'}],
|
|
),
|
|
)
|
|
|
|
build_plan = build_planner.create_build_plan()
|
|
|
|
self.assertSetEqual(build_plan.build_targets, set())
|
|
|
|
def test_target_regex_matching_not_too_broad(self):
|
|
build_target = 'test_target'
|
|
test_context = self.get_test_context(build_target)
|
|
test_context['testInfos'][0]['extraOptions'] = [{
|
|
'key': 'additional-files-filter',
|
|
'values': [f'.*a{build_target}.*\.zip'],
|
|
}]
|
|
build_planner = self.create_build_planner(
|
|
build_targets={build_target},
|
|
build_context=self.create_build_context(
|
|
test_context=test_context,
|
|
enabled_build_features=[{'name': 'test_target_unused_exclusion'}],
|
|
),
|
|
)
|
|
|
|
build_plan = build_planner.create_build_plan()
|
|
|
|
self.assertSetEqual(build_plan.build_targets, set())
|
|
|
|
def create_build_planner(
|
|
self,
|
|
build_targets: set[str],
|
|
build_context: BuildContext = None,
|
|
args: argparse.Namespace = None,
|
|
target_optimizations: dict[
|
|
str, optimized_targets.OptimizedBuildTarget
|
|
] = None,
|
|
packaging_commands: list[list[str]] = [],
|
|
) -> build_test_suites.BuildPlanner:
|
|
if not build_context:
|
|
build_context = self.create_build_context()
|
|
if not args:
|
|
args = self.create_args(extra_build_targets=build_targets)
|
|
if not target_optimizations:
|
|
target_optimizations = self.create_target_optimizations(
|
|
build_context,
|
|
build_targets,
|
|
packaging_commands,
|
|
)
|
|
return build_test_suites.BuildPlanner(
|
|
build_context, args, target_optimizations
|
|
)
|
|
|
|
def create_build_context(
|
|
self,
|
|
optimized_build_enabled: bool = True,
|
|
enabled_build_features: list[dict[str, str]] = [],
|
|
test_context: dict[str, any] = {},
|
|
) -> BuildContext:
|
|
build_context_dict = {}
|
|
build_context_dict['enabledBuildFeatures'] = enabled_build_features
|
|
if optimized_build_enabled:
|
|
build_context_dict['enabledBuildFeatures'].append(
|
|
{'name': 'optimized_build'}
|
|
)
|
|
build_context_dict['testContext'] = test_context
|
|
return BuildContext(build_context_dict)
|
|
|
|
def create_args(
|
|
self, extra_build_targets: set[str] = set()
|
|
) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('extra_targets', nargs='*')
|
|
return parser.parse_args(extra_build_targets)
|
|
|
|
def create_target_optimizations(
|
|
self,
|
|
build_context: BuildContext,
|
|
build_targets: set[str],
|
|
packaging_commands: list[list[str]] = [],
|
|
):
|
|
target_optimizations = dict()
|
|
for target in build_targets:
|
|
target_optimizations[target] = functools.partial(
|
|
self.TestOptimizedBuildTarget,
|
|
output_targets={self.get_optimized_target_name(target)},
|
|
packaging_commands=packaging_commands,
|
|
)
|
|
|
|
return target_optimizations
|
|
|
|
def get_target_flag(self, target: str):
|
|
return f'{target}_enabled'
|
|
|
|
def get_optimized_target_name(self, target: str):
|
|
return f'{target}_optimized'
|
|
|
|
def get_test_context(self, target: str):
|
|
return {
|
|
'testInfos': [
|
|
{
|
|
'name': 'atp_test',
|
|
'target': 'test_target',
|
|
'branch': 'branch',
|
|
'extraOptions': [{
|
|
'key': 'additional-files-filter',
|
|
'values': [f'{target}.zip'],
|
|
}],
|
|
'command': '/tf/command',
|
|
'extraBuildTargets': [
|
|
'extra_build_target',
|
|
],
|
|
},
|
|
],
|
|
}
|
|
|
|
|
|
def wait_until(
|
|
condition_function: Callable[[], bool],
|
|
timeout_secs: float = 3.0,
|
|
polling_interval_secs: float = 0.1,
|
|
):
|
|
"""Waits until a condition function returns True."""
|
|
|
|
start_time_secs = time.time()
|
|
|
|
while not condition_function():
|
|
if time.time() - start_time_secs > timeout_secs:
|
|
raise TimeoutError(
|
|
f'Condition not met within timeout: {timeout_secs} seconds'
|
|
)
|
|
|
|
time.sleep(polling_interval_secs)
|
|
|
|
|
|
def read_file_contents(file: pathlib.Path) -> str:
|
|
with open(file, 'r') as f:
|
|
return f.read()
|
|
|
|
|
|
def read_eventual_file_contents(file: pathlib.Path) -> str:
|
|
wait_until(lambda: file.is_file() and file.stat().st_size > 0)
|
|
return read_file_contents(file)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
ci_test_lib.main()
|