Make change and version bump to AP4A.240925.001

Snap for 12406339 from 1911f735a4 to 24Q4-release

Change-Id: I24a098bf1c57d7821558a5f895cc094b725e4bc7
This commit is contained in:
Android Build Coastguard Worker
2024-09-24 17:33:36 +00:00
7 changed files with 676 additions and 192 deletions

View File

@@ -18,4 +18,4 @@
# (like "CRB01"). It must be a single word, and is
# capitalized by convention.
BUILD_ID=AP4A.240924.004
BUILD_ID=AP4A.240925.001

View File

@@ -19,3 +19,26 @@ package {
default_applicable_licenses: ["Android-Apache-2.0"],
default_team: "trendy_team_adte",
}
python_library_host {
name: "edit_monitor_lib",
pkg_path: "edit_monitor",
srcs: [
"daemon_manager.py",
],
}
python_test_host {
name: "daemon_manager_test",
main: "daemon_manager_test.py",
pkg_path: "edit_monitor",
srcs: [
"daemon_manager_test.py",
],
libs: [
"edit_monitor_lib",
],
test_options: {
unit_test: true,
},
}

View File

@@ -0,0 +1,182 @@
# Copyright 2024, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import logging
import multiprocessing
import os
import pathlib
import signal
import subprocess
import tempfile
import time
DEFAULT_PROCESS_TERMINATION_TIMEOUT_SECONDS = 1
def default_daemon_target():
"""Place holder for the default daemon target."""
print("default daemon target")
class DaemonManager:
"""Class to manage and monitor the daemon run as a subprocess."""
def __init__(
self,
binary_path: str,
daemon_target: callable = default_daemon_target,
daemon_args: tuple = (),
):
self.binary_path = binary_path
self.daemon_target = daemon_target
self.daemon_args = daemon_args
self.pid = os.getpid()
self.daemon_process = None
pid_file_dir = pathlib.Path(tempfile.gettempdir()).joinpath("edit_monitor")
pid_file_dir.mkdir(parents=True, exist_ok=True)
self.pid_file_path = self._get_pid_file_path(pid_file_dir)
def start(self):
"""Writes the pidfile and starts the daemon proces."""
try:
self._stop_any_existing_instance()
self._write_pid_to_pidfile()
self._start_daemon_process()
except Exception as e:
logging.exception("Failed to start daemon manager with error %s", e)
def stop(self):
"""Stops the daemon process and removes the pidfile."""
logging.debug("in daemon manager cleanup.")
try:
if self.daemon_process and self.daemon_process.is_alive():
self._terminate_process(self.daemon_process.pid)
self._remove_pidfile()
except Exception as e:
logging.exception("Failed to stop daemon manager with error %s", e)
def _stop_any_existing_instance(self):
if not self.pid_file_path.exists():
logging.debug("No existing instances.")
return
ex_pid = self._read_pid_from_pidfile()
if ex_pid:
logging.info("Found another instance with pid %d.", ex_pid)
self._terminate_process(ex_pid)
self._remove_pidfile()
def _read_pid_from_pidfile(self):
with open(self.pid_file_path, "r") as f:
return int(f.read().strip())
def _write_pid_to_pidfile(self):
"""Creates a pidfile and writes the current pid to the file.
Raise FileExistsError if the pidfile already exists.
"""
try:
# Use the 'x' mode to open the file for exclusive creation
with open(self.pid_file_path, "x") as f:
f.write(f"{self.pid}")
except FileExistsError as e:
# This could be caused due to race condition that a user is trying
# to start two edit monitors at the same time. Or because there is
# already an existing edit monitor running and we can not kill it
# for some reason.
logging.exception("pidfile %s already exists.", self.pid_file_path)
raise e
def _start_daemon_process(self):
"""Starts a subprocess to run the daemon."""
p = multiprocessing.Process(
target=self.daemon_target, args=self.daemon_args
)
p.start()
logging.info("Start subprocess with PID %d", p.pid)
self.daemon_process = p
def _terminate_process(
self, pid: int, timeout: int = DEFAULT_PROCESS_TERMINATION_TIMEOUT_SECONDS
):
"""Terminates a process with given pid.
It first sends a SIGTERM to the process to allow it for proper
termination with a timeout. If the process is not terminated within
the timeout, kills it forcefully.
"""
try:
os.kill(pid, signal.SIGTERM)
if not self._wait_for_process_terminate(pid, timeout):
logging.warning(
"Process %d not terminated within timeout, try force kill", pid
)
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
logging.info("Process with PID %d not found (already terminated)", pid)
def _wait_for_process_terminate(self, pid: int, timeout: int) -> bool:
start_time = time.time()
while time.time() < start_time + timeout:
if not self._is_process_alive(pid):
return True
time.sleep(1)
logging.error("Process %d not terminated within %d seconds.", pid, timeout)
return False
def _is_process_alive(self, pid: int) -> bool:
try:
output = subprocess.check_output(
["ps", "-p", str(pid), "-o", "state="], text=True
).strip()
state = output.split()[0]
return state != "Z" # Check if the state is not 'Z' (zombie)
except subprocess.CalledProcessError:
# Process not found (already dead).
return False
except (FileNotFoundError, OSError, ValueError) as e:
logging.warning(
"Unable to check the status for process %d with error: %s.", pid, e
)
return True
def _remove_pidfile(self):
try:
os.remove(self.pid_file_path)
except FileNotFoundError:
logging.info("pid file %s already removed.", self.pid_file_path)
def _get_pid_file_path(self, pid_file_dir: pathlib.Path) -> pathlib.Path:
"""Generates the path to store the pidfile.
The file path should have the format of "/tmp/edit_monitor/xxxx.lock"
where xxxx is a hashed value based on the binary path that starts the
process.
"""
hash_object = hashlib.sha256()
hash_object.update(self.binary_path.encode("utf-8"))
pid_file_path = pid_file_dir.joinpath(hash_object.hexdigest() + ".lock")
logging.info("pid_file_path: %s", pid_file_path)
return pid_file_path

View File

@@ -0,0 +1,253 @@
# Copyright 2024, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unittests for DaemonManager."""
import logging
import multiprocessing
import os
import pathlib
import signal
import subprocess
import sys
import tempfile
import time
import unittest
from unittest import mock
from edit_monitor import daemon_manager
TEST_BINARY_FILE = '/path/to/test_binary'
TEST_PID_FILE_PATH = (
'587239c2d1050afdf54512e2d799f3b929f86b43575eb3c7b4bab105dd9bd25e.lock'
)
def simple_daemon(output_file):
with open(output_file, 'w') as f:
f.write('running daemon target')
def long_running_daemon():
while True:
time.sleep(1)
class DaemonManagerTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
# Configure to print logging to stdout.
logging.basicConfig(filename=None, level=logging.DEBUG)
console = logging.StreamHandler(sys.stdout)
logging.getLogger('').addHandler(console)
def setUp(self):
super().setUp()
self.original_tempdir = tempfile.tempdir
self.working_dir = tempfile.TemporaryDirectory()
# Sets the tempdir under the working dir so any temp files created during
# tests will be cleaned.
tempfile.tempdir = self.working_dir.name
def tearDown(self):
# Cleans up any child processes left by the tests.
self._cleanup_child_processes()
self.working_dir.cleanup()
# Restores tempdir.
tempfile.tempdir = self.original_tempdir
super().tearDown()
def test_start_success_with_no_existing_instance(self):
self.assert_run_simple_daemon_success()
def test_start_success_with_existing_instance_running(self):
# Create a long running subprocess
p = multiprocessing.Process(target=long_running_daemon)
p.start()
# Create a pidfile with the subprocess pid
pid_file_path_dir = pathlib.Path(self.working_dir.name).joinpath(
'edit_monitor'
)
pid_file_path_dir.mkdir(parents=True, exist_ok=True)
with open(pid_file_path_dir.joinpath(TEST_PID_FILE_PATH), 'w') as f:
f.write(str(p.pid))
self.assert_run_simple_daemon_success()
p.terminate()
def test_start_success_with_existing_instance_already_dead(self):
# Create a pidfile with pid that does not exist.
pid_file_path_dir = pathlib.Path(self.working_dir.name).joinpath(
'edit_monitor'
)
pid_file_path_dir.mkdir(parents=True, exist_ok=True)
with open(pid_file_path_dir.joinpath(TEST_PID_FILE_PATH), 'w') as f:
f.write('123456')
self.assert_run_simple_daemon_success()
def test_start_success_with_existing_instance_from_different_binary(self):
# First start an instance based on "some_binary_path"
existing_dm = daemon_manager.DaemonManager(
"some_binary_path",
daemon_target=long_running_daemon,
)
existing_dm.start()
self.assert_run_simple_daemon_success()
existing_dm.stop()
@mock.patch('os.kill')
def test_start_failed_to_kill_existing_instance(self, mock_kill):
mock_kill.side_effect = OSError('Unknown OSError')
pid_file_path_dir = pathlib.Path(self.working_dir.name).joinpath(
'edit_monitor'
)
pid_file_path_dir.mkdir(parents=True, exist_ok=True)
with open(pid_file_path_dir.joinpath(TEST_PID_FILE_PATH), 'w') as f:
f.write('123456')
dm = daemon_manager.DaemonManager(TEST_BINARY_FILE)
dm.start()
# Verify no daemon process is started.
self.assertIsNone(dm.daemon_process)
def test_start_failed_to_write_pidfile(self):
pid_file_path_dir = pathlib.Path(self.working_dir.name).joinpath(
'edit_monitor'
)
pid_file_path_dir.mkdir(parents=True, exist_ok=True)
# Makes the directory read-only so write pidfile will fail.
os.chmod(pid_file_path_dir, 0o555)
dm = daemon_manager.DaemonManager(TEST_BINARY_FILE)
dm.start()
# Verifies no daemon process is started.
self.assertIsNone(dm.daemon_process)
def test_start_failed_to_start_daemon_process(self):
dm = daemon_manager.DaemonManager(
TEST_BINARY_FILE, daemon_target='wrong_target', daemon_args=(1)
)
dm.start()
# Verifies no daemon process is started.
self.assertIsNone(dm.daemon_process)
def test_stop_success(self):
dm = daemon_manager.DaemonManager(
TEST_BINARY_FILE, daemon_target=long_running_daemon
)
dm.start()
dm.stop()
self.assert_no_subprocess_running()
self.assertFalse(dm.pid_file_path.exists())
@mock.patch('os.kill')
def test_stop_failed_to_kill_daemon_process(self, mock_kill):
mock_kill.side_effect = OSError('Unknown OSError')
dm = daemon_manager.DaemonManager(
TEST_BINARY_FILE, daemon_target=long_running_daemon
)
dm.start()
dm.stop()
self.assertTrue(dm.daemon_process.is_alive())
self.assertTrue(dm.pid_file_path.exists())
@mock.patch('os.remove')
def test_stop_failed_to_remove_pidfile(self, mock_remove):
mock_remove.side_effect = OSError('Unknown OSError')
dm = daemon_manager.DaemonManager(
TEST_BINARY_FILE, daemon_target=long_running_daemon
)
dm.start()
dm.stop()
self.assert_no_subprocess_running()
self.assertTrue(dm.pid_file_path.exists())
def assert_run_simple_daemon_success(self):
damone_output_file = tempfile.NamedTemporaryFile(
dir=self.working_dir.name, delete=False
)
dm = daemon_manager.DaemonManager(
TEST_BINARY_FILE,
daemon_target=simple_daemon,
daemon_args=(damone_output_file.name,),
)
dm.start()
dm.daemon_process.join()
# Verifies the expected pid file is created.
expected_pid_file_path = pathlib.Path(self.working_dir.name).joinpath(
'edit_monitor', TEST_PID_FILE_PATH
)
self.assertTrue(expected_pid_file_path.exists())
# Verify the daemon process is executed successfully.
with open(damone_output_file.name, 'r') as f:
contents = f.read()
self.assertEqual(contents, 'running daemon target')
def assert_no_subprocess_running(self):
child_pids = self._get_child_processes(os.getpid())
for child_pid in child_pids:
self.assertFalse(
self._is_process_alive(child_pid), f'process {child_pid} still alive'
)
def _get_child_processes(self, parent_pid):
try:
output = subprocess.check_output(
['ps', '-o', 'pid,ppid', '--no-headers'], text=True
)
child_processes = []
for line in output.splitlines():
pid, ppid = line.split()
if int(ppid) == parent_pid:
child_processes.append(int(pid))
return child_processes
except subprocess.CalledProcessError as e:
self.fail(f'failed to get child process, error: {e}')
def _is_process_alive(self, pid):
try:
output = subprocess.check_output(
['ps', '-p', str(pid), '-o', 'state='], text=True
).strip()
state = output.split()[0]
return state != 'Z' # Check if the state is not 'Z' (zombie)
except subprocess.CalledProcessError:
return False
def _cleanup_child_processes(self):
child_pids = self._get_child_processes(os.getpid())
for child_pid in child_pids:
try:
os.kill(child_pid, signal.SIGKILL)
except ProcessLookupError:
# process already terminated
pass
if __name__ == '__main__':
unittest.main()

View File

@@ -33,6 +33,13 @@ python_binary_host {
],
}
python_library_host {
name: "compliance_metadata",
srcs: [
"compliance_metadata.py",
],
}
python_binary_host {
name: "gen_sbom",
srcs: [
@@ -44,6 +51,7 @@ python_binary_host {
},
},
libs: [
"compliance_metadata",
"metadata_file_proto_py",
"libprotobuf-python",
"sbom_lib",

View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python3
#
# Copyright (C) 2024 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlite3
class MetadataDb:
def __init__(self, db):
self.conn = sqlite3.connect(':memory')
self.conn.row_factory = sqlite3.Row
with sqlite3.connect(db) as c:
c.backup(self.conn)
self.reorg()
def reorg(self):
# package_license table
self.conn.execute("create table package_license as "
"select name as package, pkg_default_applicable_licenses as license "
"from modules "
"where module_type = 'package' ")
cursor = self.conn.execute("select package,license from package_license where license like '% %'")
multi_licenses_packages = cursor.fetchall()
cursor.close()
rows = []
for p in multi_licenses_packages:
licenses = p['license'].strip().split(' ')
for lic in licenses:
rows.append((p['package'], lic))
self.conn.executemany('insert into package_license values (?, ?)', rows)
self.conn.commit()
self.conn.execute("delete from package_license where license like '% %'")
self.conn.commit()
# module_license table
self.conn.execute("create table module_license as "
"select distinct name as module, package, licenses as license "
"from modules "
"where licenses != '' ")
cursor = self.conn.execute("select module,package,license from module_license where license like '% %'")
multi_licenses_modules = cursor.fetchall()
cursor.close()
rows = []
for m in multi_licenses_modules:
licenses = m['license'].strip().split(' ')
for lic in licenses:
rows.append((m['module'], m['package'],lic))
self.conn.executemany('insert into module_license values (?, ?, ?)', rows)
self.conn.commit()
self.conn.execute("delete from module_license where license like '% %'")
self.conn.commit()
# module_installed_file table
self.conn.execute("create table module_installed_file as "
"select id as module_id, name as module_name, package, installed_files as installed_file "
"from modules "
"where installed_files != '' ")
cursor = self.conn.execute("select module_id, module_name, package, installed_file "
"from module_installed_file where installed_file like '% %'")
multi_installed_file_modules = cursor.fetchall()
cursor.close()
rows = []
for m in multi_installed_file_modules:
installed_files = m['installed_file'].strip().split(' ')
for f in installed_files:
rows.append((m['module_id'], m['module_name'], m['package'], f))
self.conn.executemany('insert into module_installed_file values (?, ?, ?, ?)', rows)
self.conn.commit()
self.conn.execute("delete from module_installed_file where installed_file like '% %'")
self.conn.commit()
# module_built_file table
self.conn.execute("create table module_built_file as "
"select id as module_id, name as module_name, package, built_files as built_file "
"from modules "
"where built_files != '' ")
cursor = self.conn.execute("select module_id, module_name, package, built_file "
"from module_built_file where built_file like '% %'")
multi_built_file_modules = cursor.fetchall()
cursor.close()
rows = []
for m in multi_built_file_modules:
built_files = m['installed_file'].strip().split(' ')
for f in built_files:
rows.append((m['module_id'], m['module_name'], m['package'], f))
self.conn.executemany('insert into module_built_file values (?, ?, ?, ?)', rows)
self.conn.commit()
self.conn.execute("delete from module_built_file where built_file like '% %'")
self.conn.commit()
# Indexes
self.conn.execute('create index idx_modules_id on modules (id)')
self.conn.execute('create index idx_modules_name on modules (name)')
self.conn.execute('create index idx_package_licnese_package on package_license (package)')
self.conn.execute('create index idx_package_licnese_license on package_license (license)')
self.conn.execute('create index idx_module_licnese_module on module_license (module)')
self.conn.execute('create index idx_module_licnese_license on module_license (license)')
self.conn.execute('create index idx_module_installed_file_module_id on module_installed_file (module_id)')
self.conn.execute('create index idx_module_installed_file_installed_file on module_installed_file (installed_file)')
self.conn.execute('create index idx_module_built_file_module_id on module_built_file (module_id)')
self.conn.execute('create index idx_module_built_file_built_file on module_built_file (built_file)')
self.conn.commit()
def dump_debug_db(self, debug_db):
with sqlite3.connect(debug_db) as c:
self.conn.backup(c)
def get_installed_files(self):
# Get all records from table make_metadata, which contains all installed files and corresponding make modules' metadata
cursor = self.conn.execute('select installed_file, module_path, is_prebuilt_make_module, product_copy_files, kernel_module_copy_files, is_platform_generated, license_text from make_metadata')
rows = cursor.fetchall()
cursor.close()
installed_files_metadata = []
for row in rows:
metadata = dict(zip(row.keys(), row))
installed_files_metadata.append(metadata)
return installed_files_metadata
def get_soong_modules(self):
# Get all records from table modules, which contains metadata of all soong modules
cursor = self.conn.execute('select name, package, package as module_path, module_type as soong_module_type, built_files, installed_files, static_dep_files, whole_static_dep_files from modules')
rows = cursor.fetchall()
cursor.close()
soong_modules = []
for row in rows:
soong_module = dict(zip(row.keys(), row))
soong_modules.append(soong_module)
return soong_modules
def get_package_licenses(self, package):
cursor = self.conn.execute('select m.name, m.package, m.lic_license_text as license_text '
'from package_license pl join modules m on pl.license = m.name '
'where pl.package = ?',
('//' + package,))
rows = cursor.fetchall()
licenses = {}
for r in rows:
licenses[r['name']] = r['license_text']
return licenses
def get_module_licenses(self, module_name, package):
licenses = {}
# If property "licenses" is defined on module
cursor = self.conn.execute('select m.name, m.package, m.lic_license_text as license_text '
'from module_license ml join modules m on ml.license = m.name '
'where ml.module = ? and ml.package = ?',
(module_name, package))
rows = cursor.fetchall()
for r in rows:
licenses[r['name']] = r['license_text']
if len(licenses) > 0:
return licenses
# Use default package license
cursor = self.conn.execute('select m.name, m.package, m.lic_license_text as license_text '
'from package_license pl join modules m on pl.license = m.name '
'where pl.package = ?',
('//' + package,))
rows = cursor.fetchall()
for r in rows:
licenses[r['name']] = r['license_text']
return licenses
def get_soong_module_of_installed_file(self, installed_file):
cursor = self.conn.execute('select name, m.package, m.package as module_path, module_type as soong_module_type, built_files, installed_files, static_dep_files, whole_static_dep_files '
'from modules m join module_installed_file mif on m.id = mif.module_id '
'where mif.installed_file = ?',
(installed_file,))
rows = cursor.fetchall()
cursor.close()
if rows:
soong_module = dict(zip(rows[0].keys(), rows[0]))
return soong_module
return None
def get_soong_module_of_built_file(self, built_file):
cursor = self.conn.execute('select name, m.package, m.package as module_path, module_type as soong_module_type, built_files, installed_files, static_dep_files, whole_static_dep_files '
'from modules m join module_built_file mbf on m.id = mbf.module_id '
'where mbf.built_file = ?',
(built_file,))
rows = cursor.fetchall()
cursor.close()
if rows:
soong_module = dict(zip(rows[0].keys(), rows[0]))
return soong_module
return None

View File

@@ -26,6 +26,7 @@ Usage example:
"""
import argparse
import compliance_metadata
import datetime
import google.protobuf.text_format as text_format
import hashlib
@@ -35,7 +36,6 @@ import queue
import metadata_file_pb2
import sbom_data
import sbom_writers
import sqlite3
# Package type
PKG_SOURCE = 'SOURCE'
@@ -568,202 +568,16 @@ def get_all_transitive_static_dep_files_of_installed_files(installed_files_metad
return sorted(all_static_dep_files.keys())
class MetadataDb:
def __init__(self, db):
self.conn = sqlite3.connect(':memory')
self.conn.row_factory = sqlite3.Row
with sqlite3.connect(db) as c:
c.backup(self.conn)
self.reorg()
def reorg(self):
# package_license table
self.conn.execute("create table package_license as "
"select name as package, pkg_default_applicable_licenses as license "
"from modules "
"where module_type = 'package' ")
cursor = self.conn.execute("select package,license from package_license where license like '% %'")
multi_licenses_packages = cursor.fetchall()
cursor.close()
rows = []
for p in multi_licenses_packages:
licenses = p['license'].strip().split(' ')
for lic in licenses:
rows.append((p['package'], lic))
self.conn.executemany('insert into package_license values (?, ?)', rows)
self.conn.commit()
self.conn.execute("delete from package_license where license like '% %'")
self.conn.commit()
# module_license table
self.conn.execute("create table module_license as "
"select distinct name as module, package, licenses as license "
"from modules "
"where licenses != '' ")
cursor = self.conn.execute("select module,package,license from module_license where license like '% %'")
multi_licenses_modules = cursor.fetchall()
cursor.close()
rows = []
for m in multi_licenses_modules:
licenses = m['license'].strip().split(' ')
for lic in licenses:
rows.append((m['module'], m['package'],lic))
self.conn.executemany('insert into module_license values (?, ?, ?)', rows)
self.conn.commit()
self.conn.execute("delete from module_license where license like '% %'")
self.conn.commit()
# module_installed_file table
self.conn.execute("create table module_installed_file as "
"select id as module_id, name as module_name, package, installed_files as installed_file "
"from modules "
"where installed_files != '' ")
cursor = self.conn.execute("select module_id, module_name, package, installed_file "
"from module_installed_file where installed_file like '% %'")
multi_installed_file_modules = cursor.fetchall()
cursor.close()
rows = []
for m in multi_installed_file_modules:
installed_files = m['installed_file'].strip().split(' ')
for f in installed_files:
rows.append((m['module_id'], m['module_name'], m['package'], f))
self.conn.executemany('insert into module_installed_file values (?, ?, ?, ?)', rows)
self.conn.commit()
self.conn.execute("delete from module_installed_file where installed_file like '% %'")
self.conn.commit()
# module_built_file table
self.conn.execute("create table module_built_file as "
"select id as module_id, name as module_name, package, built_files as built_file "
"from modules "
"where built_files != '' ")
cursor = self.conn.execute("select module_id, module_name, package, built_file "
"from module_built_file where built_file like '% %'")
multi_built_file_modules = cursor.fetchall()
cursor.close()
rows = []
for m in multi_built_file_modules:
built_files = m['installed_file'].strip().split(' ')
for f in built_files:
rows.append((m['module_id'], m['module_name'], m['package'], f))
self.conn.executemany('insert into module_built_file values (?, ?, ?, ?)', rows)
self.conn.commit()
self.conn.execute("delete from module_built_file where built_file like '% %'")
self.conn.commit()
# Indexes
self.conn.execute('create index idx_modules_id on modules (id)')
self.conn.execute('create index idx_modules_name on modules (name)')
self.conn.execute('create index idx_package_licnese_package on package_license (package)')
self.conn.execute('create index idx_package_licnese_license on package_license (license)')
self.conn.execute('create index idx_module_licnese_module on module_license (module)')
self.conn.execute('create index idx_module_licnese_license on module_license (license)')
self.conn.execute('create index idx_module_installed_file_module_id on module_installed_file (module_id)')
self.conn.execute('create index idx_module_installed_file_installed_file on module_installed_file (installed_file)')
self.conn.execute('create index idx_module_built_file_module_id on module_built_file (module_id)')
self.conn.execute('create index idx_module_built_file_built_file on module_built_file (built_file)')
self.conn.commit()
if args.debug:
with sqlite3.connect(os.path.dirname(args.metadata) + '/compliance-metadata-debug.db') as c:
self.conn.backup(c)
def get_installed_files(self):
# Get all records from table make_metadata, which contains all installed files and corresponding make modules' metadata
cursor = self.conn.execute('select installed_file, module_path, is_prebuilt_make_module, product_copy_files, kernel_module_copy_files, is_platform_generated, license_text from make_metadata')
rows = cursor.fetchall()
cursor.close()
installed_files_metadata = []
for row in rows:
metadata = dict(zip(row.keys(), row))
installed_files_metadata.append(metadata)
return installed_files_metadata
def get_soong_modules(self):
# Get all records from table modules, which contains metadata of all soong modules
cursor = self.conn.execute('select name, package, package as module_path, module_type as soong_module_type, built_files, installed_files, static_dep_files, whole_static_dep_files from modules')
rows = cursor.fetchall()
cursor.close()
soong_modules = []
for row in rows:
soong_module = dict(zip(row.keys(), row))
soong_modules.append(soong_module)
return soong_modules
def get_package_licenses(self, package):
cursor = self.conn.execute('select m.name, m.package, m.lic_license_text as license_text '
'from package_license pl join modules m on pl.license = m.name '
'where pl.package = ?',
('//' + package,))
rows = cursor.fetchall()
licenses = {}
for r in rows:
licenses[r['name']] = r['license_text']
return licenses
def get_module_licenses(self, module_name, package):
licenses = {}
# If property "licenses" is defined on module
cursor = self.conn.execute('select m.name, m.package, m.lic_license_text as license_text '
'from module_license ml join modules m on ml.license = m.name '
'where ml.module = ? and ml.package = ?',
(module_name, package))
rows = cursor.fetchall()
for r in rows:
licenses[r['name']] = r['license_text']
if len(licenses) > 0:
return licenses
# Use default package license
cursor = self.conn.execute('select m.name, m.package, m.lic_license_text as license_text '
'from package_license pl join modules m on pl.license = m.name '
'where pl.package = ?',
('//' + package,))
rows = cursor.fetchall()
for r in rows:
licenses[r['name']] = r['license_text']
return licenses
def get_soong_module_of_installed_file(self, installed_file):
cursor = self.conn.execute('select name, m.package, m.package as module_path, module_type as soong_module_type, built_files, installed_files, static_dep_files, whole_static_dep_files '
'from modules m join module_installed_file mif on m.id = mif.module_id '
'where mif.installed_file = ?',
(installed_file,))
rows = cursor.fetchall()
cursor.close()
if rows:
soong_module = dict(zip(rows[0].keys(), rows[0]))
return soong_module
return None
def get_soong_module_of_built_file(self, built_file):
cursor = self.conn.execute('select name, m.package, m.package as module_path, module_type as soong_module_type, built_files, installed_files, static_dep_files, whole_static_dep_files '
'from modules m join module_built_file mbf on m.id = mbf.module_id '
'where mbf.built_file = ?',
(built_file,))
rows = cursor.fetchall()
cursor.close()
if rows:
soong_module = dict(zip(rows[0].keys(), rows[0]))
return soong_module
return None
def main():
global args
args = get_args()
log('Args:', vars(args))
global db
db = MetadataDb(args.metadata)
db = compliance_metadata.MetadataDb(args.metadata)
if args.debug:
db.dump_debug_db(os.path.dirname(args.output_file) + '/compliance-metadata-debug.db')
global metadata_file_protos
metadata_file_protos = {}
global licenses_text