Writing temp files to temporary directory for best practices.
Bug: 153058129
Test: external/avb$ atest :all
Test: ./aftltool_integration_test.py
Change-Id: If3a004c998afb04b86643b96e653033f66050a35
diff --git a/aftltool.py b/aftltool.py
index b695d16..0fe2d07 100755
--- a/aftltool.py
+++ b/aftltool.py
@@ -1491,8 +1491,9 @@
process_number: The number of the processes executing the function.
submission_count: Number of total submissions to perform per
process_count.
- preserve_icp_images: Boolean to indicate if the generated vbmeta
- image files with inclusion proofs should preserved.
+ preserve_icp_images: Boolean to indicate if the generated vbmeta image
+ files with inclusion proofs should be preserved in the temporary
+ directory.
timeout: Duration in seconds before requests to the AFTL times out. A
value of 0 or None means there will be no timeout.
result_queue: Multiprocessing.Queue object for posting execution results.
@@ -1500,7 +1501,8 @@
for count in range(0, submission_count):
version_incremental = 'aftl_load_testing_{}_{}'.format(process_number,
count)
- output_file = '{}_icp.img'.format(version_incremental)
+ output_file = os.path.join(tempfile.gettempdir(),
+ '{}_icp.img'.format(version_incremental))
output = open(output_file, 'wb')
# Instrumented section.
@@ -1543,6 +1545,7 @@
submission_count: Number of total submissions to perform per
process_count.
stats_filename: Path to the stats file to write the raw execution data to.
+ If None, it will be written to the temp directory.
preserve_icp_images: Boolean to indicate if the generated vbmeta
image files with inclusion proofs should preserved.
timeout: Duration in seconds before requests to the AFTL times out. A
@@ -1557,11 +1560,13 @@
return False
if not stats_filename:
- stats_filename = 'load_test_p{}_s{}.csv'.format(process_count,
- submission_count)
+ stats_filename = os.path.join(
+ tempfile.gettempdir(),
+ 'load_test_p{}_s{}.csv'.format(process_count, submission_count))
+
stats_file = None
try:
- stats_file = open(stats_filename, 'w')
+ stats_file = open(stats_filename, 'wt')
stats_file.write('start_time,end_time,execution_time,version_incremental,'
'result\n')
except IOError as e:
diff --git a/aftltool_test.py b/aftltool_test.py
index b568dee..006deb9 100755
--- a/aftltool_test.py
+++ b/aftltool_test.py
@@ -31,6 +31,7 @@
import os
import struct
import sys
+import tempfile
import unittest
import aftltool
@@ -728,25 +729,23 @@
def test_verify_icp(self):
"""Tests verify_icp method."""
- key_file = 'transparency_log_pub_key.pem'
- with open(key_file, 'w') as f:
- f.write(self.test_aftl_pub_key)
+ with tempfile.NamedTemporaryFile('wt+') as key_file:
+ key_file.write(self.test_aftl_pub_key)
+ key_file.flush()
- # Valid ICP.
- entry = aftltool.AftlIcpEntry()
- entry.translate_response(self.test_tl_url_1, self.test_afi_resp)
- self.assertTrue(entry.verify_icp(key_file))
+ # Valid ICP.
+ entry = aftltool.AftlIcpEntry()
+ entry.translate_response(self.test_tl_url_1, self.test_afi_resp)
+ self.assertTrue(entry.verify_icp(key_file.name))
- # Invalid ICP where fw_info_leaf is not matching up with proofs.
- # pylint: disable=protected-access
- entry = aftltool.AftlIcpEntry()
- entry.translate_response(self.test_tl_url_1, self.test_afi_resp)
- fw_info_leaf_bytes = entry.fw_info_leaf._fw_info_leaf_bytes.replace(
- b'ViNzEQS', b'1234567')
- entry.fw_info_leaf._fw_info_leaf_bytes = fw_info_leaf_bytes
- self.assertFalse(entry.verify_icp(key_file))
-
- os.remove(key_file)
+ # Invalid ICP where fw_info_leaf is not matching up with proofs.
+ # pylint: disable=protected-access
+ entry = aftltool.AftlIcpEntry()
+ entry.translate_response(self.test_tl_url_1, self.test_afi_resp)
+ fw_info_leaf_bytes = entry.fw_info_leaf._fw_info_leaf_bytes.replace(
+ b'ViNzEQS', b'1234567')
+ entry.fw_info_leaf._fw_info_leaf_bytes = fw_info_leaf_bytes
+ self.assertFalse(entry.verify_icp(key_file.name))
def test_verify_vbmeta_image(self):
"""Tests the verify_vbmeta_image method."""
@@ -1156,7 +1155,6 @@
self.manufacturer_key = None
self.set_up_environment()
- self.output_filename = 'vbmeta_icp.img'
self.transparency_log_config = aftltool.TransparencyLogConfig(
self.aftl_host, self.aftl_pubkey, self.aftl_apikey)
@@ -1173,12 +1171,12 @@
}
self.info_icp_default_params = {
- 'vbmeta_image_path': self.output_filename,
+ 'vbmeta_image_path': None,
'output': io.StringIO()
}
self.verify_icp_default_params = {
- 'vbmeta_image_path': self.output_filename,
+ 'vbmeta_image_path': None,
'transparency_log_pub_keys': [self.aftl_pubkey],
'output': io.StringIO()
}
@@ -1195,24 +1193,6 @@
'timeout': None
}
- self.load_test_stats_file_p1_s1 = 'load_test_p1_s1.csv'
- self.load_test_stats_file_p2_p2 = 'load_test_p2_s2.csv'
-
- self.files_to_cleanup = [
- self.output_filename,
- self.load_test_stats_file_p1_s1,
- self.load_test_stats_file_p2_p2
- ]
-
- def tearDown(self):
- """Tears down the test bed for the unit tests."""
- for filename in self.files_to_cleanup:
- try:
- os.remove(filename)
- except OSError:
- pass
- super(AftlTestCase, self).tearDown()
-
def set_up_environment(self):
"""Sets up member variables for the particular test environment.
@@ -1355,22 +1335,25 @@
aftl = self.get_aftl_implementation(self.test_afi_resp)
# Make a VBmeta image with ICP.
- with open(self.output_filename, 'wb') as output_file:
+ with tempfile.NamedTemporaryFile('wb+') as output_file:
self.make_icp_default_params['output'] = output_file
result = aftl.make_icp_from_vbmeta(**self.make_icp_default_params)
- self.assertTrue(result)
+ output_file.flush()
+ self.assertTrue(result)
- # Checks that there is 1 ICP.
- aftl_image = aftl.get_aftl_image(self.output_filename)
- self.assertEqual(aftl_image.image_header.icp_count, 1)
+ # Checks that there is 1 ICP.
+ aftl_image = aftl.get_aftl_image(output_file.name)
+ self.assertEqual(aftl_image.image_header.icp_count, 1)
- # Verifies the generated image.
- result = aftl.verify_image_icp(**self.verify_icp_default_params)
- self.assertTrue(result)
+ # Verifies the generated image.
+ self.verify_icp_default_params['vbmeta_image_path'] = output_file.name
+ result = aftl.verify_image_icp(**self.verify_icp_default_params)
+ self.assertTrue(result)
- # Prints the image details.
- result = aftl.info_image_icp(**self.info_icp_default_params)
- self.assertTrue(result)
+ # Prints the image details.
+ self.info_icp_default_params['vbmeta_image_path'] = output_file.name
+ result = aftl.info_image_icp(**self.info_icp_default_params)
+ self.assertTrue(result)
def test_make_and_verify_icp_with_2_logs(self):
"""Tests make_icp_from_vbmeta, verify_image_icp & info_image_icp."""
@@ -1381,23 +1364,26 @@
self.transparency_log_config, self.transparency_log_config]
# Make a VBmeta image with ICP.
- with open(self.output_filename, 'wb') as output_file:
+ with tempfile.NamedTemporaryFile('wb+') as output_file:
self.make_icp_default_params['output'] = output_file
result = aftl.make_icp_from_vbmeta(
**self.make_icp_default_params)
+ output_file.flush()
self.assertTrue(result)
- # Checks that there are 2 ICPs.
- aftl_image = aftl.get_aftl_image(self.output_filename)
- self.assertEqual(aftl_image.image_header.icp_count, 2)
+ # Checks that there are 2 ICPs.
+ aftl_image = aftl.get_aftl_image(output_file.name)
+ self.assertEqual(aftl_image.image_header.icp_count, 2)
- # Verifies the generated image.
- result = aftl.verify_image_icp(**self.verify_icp_default_params)
- self.assertTrue(result)
+ # Verifies the generated image.
+ self.verify_icp_default_params['vbmeta_image_path'] = output_file.name
+ result = aftl.verify_image_icp(**self.verify_icp_default_params)
+ self.assertTrue(result)
- # Prints the image details.
- result = aftl.info_image_icp(**self.info_icp_default_params)
- self.assertTrue(result)
+ # Prints the image details.
+ self.info_icp_default_params['vbmeta_image_path'] = output_file.name
+ result = aftl.info_image_icp(**self.info_icp_default_params)
+ self.assertTrue(result)
def test_info_image_icp(self):
"""Tests info_image_icp with vbmeta image with 2 ICP."""
@@ -1460,7 +1446,7 @@
aftl = self.get_aftl_implementation(aftltool.AftlError('Comms error'))
self.make_icp_default_params[
'transparency_log_configs'][0].target = 'www.google.com:80'
- with open(self.output_filename, 'wb') as output_file:
+ with tempfile.NamedTemporaryFile('wb+') as output_file:
self.make_icp_default_params['output'] = output_file
result = aftl.make_icp_from_vbmeta(
**self.make_icp_default_params)
@@ -1474,7 +1460,7 @@
# time of the transparency log per load test results in b/139407814#2 where
# it was 3.43 seconds.
self.make_icp_default_params['timeout'] = 1
- with open(self.output_filename, 'wb') as output_file:
+ with tempfile.NamedTemporaryFile('wb+') as output_file:
self.make_icp_default_params['output'] = output_file
result = aftl.make_icp_from_vbmeta(
**self.make_icp_default_params)
@@ -1484,14 +1470,18 @@
"""Tests load_test_aftl command with 1 process which does 1 submission."""
aftl = self.get_aftl_implementation(self.test_afi_resp)
- result = aftl.load_test_aftl(**self.load_test_aftl_default_params)
- self.assertTrue(result)
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ self.load_test_aftl_default_params[
+ 'stats_filename'] = os.path.join(tmp_dir, 'load_test.csv')
+ result = aftl.load_test_aftl(**self.load_test_aftl_default_params)
+ self.assertTrue(result)
- output = self.load_test_aftl_default_params['output'].getvalue()
- self.assertRegex(output, 'Succeeded:.+?1\n')
- self.assertRegex(output, 'Failed:.+?0\n')
+ output = self.load_test_aftl_default_params['output'].getvalue()
+ self.assertRegex(output, 'Succeeded:.+?1\n')
+ self.assertRegex(output, 'Failed:.+?0\n')
- self.assertTrue(os.path.exists(self.load_test_stats_file_p1_s1))
+ self.assertTrue(os.path.exists(
+ self.load_test_aftl_default_params['stats_filename']))
def test_load_test_multi_process_multi_submission(self):
"""Tests load_test_aftl command with 2 processes and 2 submissions each."""
@@ -1499,14 +1489,18 @@
self.load_test_aftl_default_params['process_count'] = 2
self.load_test_aftl_default_params['submission_count'] = 2
- result = aftl.load_test_aftl(**self.load_test_aftl_default_params)
- self.assertTrue(result)
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ self.load_test_aftl_default_params[
+ 'stats_filename'] = os.path.join(tmp_dir, 'load_test.csv')
+ result = aftl.load_test_aftl(**self.load_test_aftl_default_params)
+ self.assertTrue(result)
- output = self.load_test_aftl_default_params['output'].getvalue()
- self.assertRegex(output, 'Succeeded:.+?4\n')
- self.assertRegex(output, 'Failed:.+?0\n')
+ output = self.load_test_aftl_default_params['output'].getvalue()
+ self.assertRegex(output, 'Succeeded:.+?4\n')
+ self.assertRegex(output, 'Failed:.+?0\n')
- self.assertTrue(os.path.exists(self.load_test_stats_file_p2_p2))
+ self.assertTrue(os.path.exists(
+ self.load_test_aftl_default_params['stats_filename']))
def test_load_test_invalid_grpc_service(self):
"""Tests load_test_aftl command with a host that does not support GRPC."""