diff --git a/prometheus_client/values.py b/prometheus_client/values.py index 6ff85e3b..7a3c3033 100644 --- a/prometheus_client/values.py +++ b/prometheus_client/values.py @@ -5,6 +5,15 @@ from .mmap_dict import mmap_key, MmapedDict +_multi_process_cleanups = [] + + +def close_all_multiprocess_files(): + for cleanup in _multi_process_cleanups: + cleanup() + _multi_process_cleanups.clear() + + class MutexValue: """A float protected by a mutex.""" @@ -52,6 +61,13 @@ def MultiProcessValue(process_identifier=os.getpid): # This avoids the need to also have mutexes in __MmapDict. lock = Lock() + def cleanup(): + for f in files.values(): + f.close() + files.clear() + values.clear() + _multi_process_cleanups.append(cleanup) + class MmapedValue: """A float protected by a mutex backed by a per-process mmaped file.""" @@ -122,6 +138,14 @@ def get_exemplar(self): # TODO: Implement exemplars for multiprocess mode. return None + @classmethod + def close_all_files(cls): + with lock: + for f in files.values(): + f.close() + files.clear() + values.clear() + return MmapedValue diff --git a/tests/test_asgi.py b/tests/test_asgi.py index 6e795e21..028dac2b 100644 --- a/tests/test_asgi.py +++ b/tests/test_asgi.py @@ -32,19 +32,24 @@ def setUp(self): # Setup ASGI scope self.scope = {} setup_testing_defaults(self.scope) + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) self.communicator = None def tearDown(self): if self.communicator: - asyncio.new_event_loop().run_until_complete( + self.loop.run_until_complete( self.communicator.wait() ) + self.loop.close() def seed_app(self, app): - self.communicator = ApplicationCommunicator(app, self.scope) + async def _init(): + self.communicator = ApplicationCommunicator(app, self.scope) + self.loop.run_until_complete(_init()) def send_input(self, payload): - asyncio.new_event_loop().run_until_complete( + self.loop.run_until_complete( self.communicator.send_input(payload) ) @@ -52,7 +57,7 @@ def send_default_request(self): self.send_input({"type": "http.request", "body": b""}) def get_output(self): - output = asyncio.new_event_loop().run_until_complete( + output = self.loop.run_until_complete( self.communicator.receive_output(0) ) return output @@ -148,9 +153,9 @@ def test_gzip(self): increments = 2 self.increment_metrics(metric_name, help_text, increments) app = make_asgi_app(self.registry) - self.seed_app(app) # Send input with gzip header. self.scope["headers"] = [(b"accept-encoding", b"gzip")] + self.seed_app(app) self.send_input({"type": "http.request", "body": b""}) # Assert outputs are compressed. outputs = self.get_all_output() @@ -164,9 +169,9 @@ def test_gzip_disabled(self): self.increment_metrics(metric_name, help_text, increments) # Disable compression explicitly. app = make_asgi_app(self.registry, disable_compression=True) - self.seed_app(app) # Send input with gzip header. self.scope["headers"] = [(b"accept-encoding", b"gzip")] + self.seed_app(app) self.send_input({"type": "http.request", "body": b""}) # Assert outputs are not compressed. outputs = self.get_all_output() @@ -175,8 +180,8 @@ def test_gzip_disabled(self): def test_openmetrics_encoding(self): """Response content type is application/openmetrics-text when appropriate Accept header is in request""" app = make_asgi_app(self.registry) - self.seed_app(app) self.scope["headers"] = [(b"Accept", b"application/openmetrics-text; version=1.0.0")] + self.seed_app(app) self.send_input({"type": "http.request", "body": b""}) content_type = self.get_response_header_value('Content-Type').split(";")[0] @@ -204,8 +209,8 @@ def test_qs_parsing(self): self.increment_metrics(*m) for i_1 in range(len(metrics)): - self.seed_app(app) self.scope['query_string'] = f"name[]={metrics[i_1][0]}_total".encode("utf-8") + self.seed_app(app) self.send_default_request() outputs = self.get_all_output() @@ -220,7 +225,7 @@ def test_qs_parsing(self): self.assert_not_metrics(output, *metrics[i_2]) - asyncio.new_event_loop().run_until_complete( + self.loop.run_until_complete( self.communicator.wait() ) @@ -237,8 +242,8 @@ def test_qs_parsing_multi(self): for m in metrics: self.increment_metrics(*m) - self.seed_app(app) self.scope['query_string'] = "&".join([f"name[]={m[0]}_total" for m in metrics[0:2]]).encode("utf-8") + self.seed_app(app) self.send_default_request() outputs = self.get_all_output() @@ -249,6 +254,6 @@ def test_qs_parsing_multi(self): self.assert_metrics(output, *metrics[1]) self.assert_not_metrics(output, *metrics[2]) - asyncio.new_event_loop().run_until_complete( + self.loop.run_until_complete( self.communicator.wait() ) diff --git a/tests/test_multiprocess.py b/tests/test_multiprocess.py index ee0c7423..eab93f50 100644 --- a/tests/test_multiprocess.py +++ b/tests/test_multiprocess.py @@ -24,21 +24,26 @@ def setUp(self): def tearDown(self): os.environ.pop('prometheus_multiproc_dir', None) os.environ.pop('PROMETHEUS_MULTIPROC_DIR', None) + values.close_all_multiprocess_files() values.ValueClass = MutexValue shutil.rmtree(self.tempdir) def test_deprecation_warning(self): os.environ['prometheus_multiproc_dir'] = self.tempdir with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") values.ValueClass = get_value_class() registry = CollectorRegistry() collector = MultiProcessCollector(registry) Counter('c', 'help', registry=None) assert os.environ['PROMETHEUS_MULTIPROC_DIR'] == self.tempdir - assert len(w) == 1 - assert issubclass(w[-1].category, DeprecationWarning) - assert "PROMETHEUS_MULTIPROC_DIR" in str(w[-1].message) + if os.name != 'nt': + assert len(w) == 1 + assert issubclass(w[-1].category, DeprecationWarning) + assert "PROMETHEUS_MULTIPROC_DIR" in str(w[-1].message) + else: + assert len(w) == 0 def test_mark_process_dead_respects_lowercase(self): os.environ['prometheus_multiproc_dir'] = self.tempdir @@ -61,8 +66,9 @@ def _value_class(self): def tearDown(self): del os.environ['PROMETHEUS_MULTIPROC_DIR'] - shutil.rmtree(self.tempdir) + values.close_all_multiprocess_files() values.ValueClass = MutexValue + shutil.rmtree(self.tempdir) def test_counter_adds(self): c1 = Counter('c', 'help', registry=None) @@ -119,6 +125,7 @@ def test_gauge_liveall(self): g2.set(2) self.assertEqual(1, self.registry.get_sample_value('g', {'pid': '123'})) self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'})) + values.close_all_multiprocess_files() mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) self.assertEqual(None, self.registry.get_sample_value('g', {'pid': '123'})) self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'})) @@ -140,6 +147,7 @@ def test_gauge_livemin(self): g1.set(1) g2.set(2) self.assertEqual(1, self.registry.get_sample_value('g')) + values.close_all_multiprocess_files() mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) self.assertEqual(2, self.registry.get_sample_value('g')) @@ -160,6 +168,7 @@ def test_gauge_livemax(self): g1.set(2) g2.set(1) self.assertEqual(2, self.registry.get_sample_value('g')) + values.close_all_multiprocess_files() mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) self.assertEqual(1, self.registry.get_sample_value('g')) @@ -171,6 +180,7 @@ def test_gauge_sum(self): g1.set(1) g2.set(2) self.assertEqual(3, self.registry.get_sample_value('g')) + values.close_all_multiprocess_files() mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) self.assertEqual(3, self.registry.get_sample_value('g')) @@ -182,6 +192,7 @@ def test_gauge_livesum(self): g1.set(1) g2.set(2) self.assertEqual(3, self.registry.get_sample_value('g')) + values.close_all_multiprocess_files() mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) self.assertEqual(2, self.registry.get_sample_value('g')) @@ -192,6 +203,7 @@ def test_gauge_mostrecent(self): g2.set(2) g1.set(1) self.assertEqual(1, self.registry.get_sample_value('g')) + values.close_all_multiprocess_files() mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) self.assertEqual(1, self.registry.get_sample_value('g')) @@ -202,6 +214,7 @@ def test_gauge_livemostrecent(self): g2.set(2) g1.set(1) self.assertEqual(1, self.registry.get_sample_value('g')) + values.close_all_multiprocess_files() mark_process_dead(123, os.environ['PROMETHEUS_MULTIPROC_DIR']) self.assertEqual(2, self.registry.get_sample_value('g')) @@ -626,6 +639,7 @@ def test_corruption_detected(self): list(self.d.read_all_values()) def tearDown(self): + self.d.close() os.unlink(self.tempfile) diff --git a/tests/test_parser.py b/tests/test_parser.py index 49c4dc8c..1a23d809 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -375,8 +375,15 @@ def collect(self): self.assertEqual(text.encode('utf-8'), generate_latest(registry, ALLOWUTF8)) -def test_benchmark_text_string_to_metric_families(benchmark): - text = """# HELP go_gc_duration_seconds A summary of the GC invocation durations. +try: + import pytest_benchmark + HAS_BENCHMARK = True +except ImportError: + HAS_BENCHMARK = False + +if HAS_BENCHMARK: + def test_benchmark_text_string_to_metric_families(benchmark): + text = """# HELP go_gc_duration_seconds A summary of the GC invocation durations. # TYPE go_gc_duration_seconds summary go_gc_duration_seconds{quantile="0"} 0.013300656000000001 go_gc_duration_seconds{quantile="0.25"} 0.013638736 @@ -422,11 +429,11 @@ def test_benchmark_text_string_to_metric_families(benchmark): hist_sum 2 """ - @benchmark - def _(): - # We need to convert the generator to a full list in order to - # accurately measure the time to yield everything. - return list(text_string_to_metric_families(text)) + @benchmark + def _(): + # We need to convert the generator to a full list in order to + # accurately measure the time to yield everything. + return list(text_string_to_metric_families(text)) if __name__ == '__main__':