Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 6 additions & 27 deletions hapiclient/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,28 +125,21 @@ def meta_cache_write(meta, SERVER, DATASET, opts):
"""Write metadata to JSON and PKL cache files."""

import os
import json
import pickle

from hapiclient.log import log
from hapiclient.util import write_atomic

if not opts["cache"]:
return

paths = meta_cache_paths(SERVER, DATASET, opts['cachedir'])
fnamejson, fnamepkl = paths['json'], paths['pkl']

server_dir = cachedir(opts["cachedir"], SERVER)
os.makedirs(server_dir, exist_ok=True)

log('Writing %s ' % os.path.basename(fnamejson))
with open(fnamejson, 'w') as f:
json.dump(meta, f, indent=4)
write_atomic(fnamejson, meta)

log('Writing %s ' % os.path.basename(fnamepkl))
with open(fnamepkl, 'wb') as f:
# protocol=2 used for Python 2.7 compatibility.
pickle.dump(meta, f, protocol=2)
write_atomic(fnamepkl, meta)


def data_cache_paths(SERVER, DATASET, PARAMETERS, START, STOP, cachedir):
Expand Down Expand Up @@ -215,11 +208,9 @@ def data_cache_write(data_result, meta, SERVER, DATASET, PARAMETERS, START, STOP
"""

import os
import pickle
import warnings
import numpy as np

from hapiclient.log import log
from hapiclient.util import write_atomic

data_paths = data_cache_paths(SERVER, DATASET, PARAMETERS, START, STOP, opts['cachedir'])
fnamecsv, fnamebin, fnamenpy, fnamepklx = data_paths['csv'], data_paths['bin'], data_paths['npy'], data_paths['pkl']
Expand All @@ -236,20 +227,8 @@ def data_cache_write(data_result, meta, SERVER, DATASET, PARAMETERS, START, STOP
# Need to return after meta is updated.
return

server_dir = cachedir(opts["cachedir"], SERVER)
os.makedirs(server_dir, exist_ok=True)

log('Writing %s' % os.path.basename(fnamepklx))
with open(fnamepklx, 'wb') as f:
pickle.dump(meta, f, protocol=2)
write_atomic(fnamepklx, meta)

log('Writing %s' % os.path.basename(fnamenpy))
with warnings.catch_warnings():
# Ignore warning that occurs when saving Unicode data.
kwargs = {
'message': r"Stored array in format 3\.0.*",
'category': UserWarning,
'module': r"numpy\.lib\.format"
}
warnings.filterwarnings("ignore", **kwargs)
np.save(fnamenpy, data_result)
write_atomic(fnamenpy, data_result)
2 changes: 1 addition & 1 deletion hapiclient/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def data(SERVER, DATASET, PARAMETERS, START, STOP, opts):
log('STOP was given as None. Getting stopDate for dataset.')
meta = info(SERVER, DATASET, None, opts)
STOP = meta['stopDate']
log('Using STOP = {STOP}')
log(f'Using STOP = {STOP}')

tic_totalTime = time.time()

Expand Down
90 changes: 71 additions & 19 deletions hapiclient/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ def error(msg, debug=False):
from inspect import stack
from os import path

debug = False
if pythonshell() != 'shell':
try:
from IPython.core.interactiveshell import InteractiveShell
Expand All @@ -214,15 +213,15 @@ def prefix():
import platform
prefix = "\033[0;31mHAPIError:\033[0m "
if platform.system() == 'Windows' and pythonshell() == 'shell':
prefix = "HAPIError: "
prefix = "HAPIError: "

return prefix

def exception_handler_ipython(self, exc_tuple=None,
filename=None, tb_offset=None,
exception_only=False,
running_compiled_code=False):

exception = sys.exc_info()
if not debug and exception[0].__name__ == "HAPIError":
sys.stderr.write(prefix() + str(exception[1]))
Expand Down Expand Up @@ -347,26 +346,19 @@ def get_full_class_name(obj):


def urlretrieve(url, fname):
"""Download URL to file
"""Download URL to file atomically.

res = urlretrieve(url, fname)
"""

import os
import shutil

dirname = os.path.dirname(fname)
if not os.path.exists(dirname):
os.makedirs(dirname)
log('Writing')
log(' %s' % url)
log('to')
log(' %s' % fname)
res = urlopen(url)
write_atomic(fname, res)

with open(fname, 'wb') as out:
res = urlopen(url)
log('Writing')
log('%s' % url)
log('to')
log('%s' % os.path.basename(fname))
shutil.copyfileobj(res, out)
return res
return res


def subset_meta(meta, params):
Expand Down Expand Up @@ -394,7 +386,7 @@ def subset_meta(meta, params):
pa = [meta['parameters'][0]] # First parameter is always the time parameter

params_reordered = [] # Re-ordered params
# If time parameter explicity requested, put it first in params_reordered.
# If time parameter explicitly requested, put it first in params_reordered.
if meta['parameters'][0]['name'] in p:
params_reordered = [meta['parameters'][0]['name']]

Expand Down Expand Up @@ -454,3 +446,63 @@ def missing_length(meta, opts):
return True

return False


def write_atomic(path, data):

import os
import json
import pickle
import pathlib
import secrets
import warnings

import numpy

path = pathlib.Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
tmp_ext = f".{secrets.token_hex(3)}.tmp"
tmp_path = path.with_suffix(path.suffix + tmp_ext)

try:

if path.suffix == '.json':
with tmp_path.open('w') as f:
json.dump(data, f, indent=2)

if path.suffix == '.pkl':
with tmp_path.open('wb') as f:
pickle.dump(data, f, protocol=2)

if path.suffix == '.npy':
with warnings.catch_warnings():
# Ignore warning that occurs when saving Unicode data.
kwargs = {
'message': r"Stored array in format 3\.0.*",
'category': UserWarning,
'module': r"numpy\.lib\.format"
}
warnings.filterwarnings("ignore", **kwargs)
with tmp_path.open('wb') as f:
numpy.save(f, data)

if path.suffix in ('.bin', '.csv'):
with tmp_path.open('wb') as f:
if isinstance(data, (bytes, bytearray)):
f.write(data)
else:
# Assume a file-like / streaming response object.
import shutil
shutil.copyfileobj(data, f)

try:
os.replace(tmp_path, path)
except Exception as e:
warning(f"Failed to rename cache file from {tmp_path} to {path}: {e}")

except Exception as e:
warning(f"Failed to write cache file {tmp_path}: {e}")
try:
tmp_path.unlink()
except OSError:
warning(f"Failed to remove temporary cache file {tmp_path}")
119 changes: 119 additions & 0 deletions test/test_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# See ../README.md for instructions on running tests.
import shutil

from hapiclient.hapi import hapi

from util import compare

from util.get_logger import get_logger
logger = get_logger(__name__)

kwargs = {
'cache': False,
'usecache': False,
'cachedir': '/tmp/hapi-data',
'logging': False
}

server = 'http://hapi-server.org/servers/TestData2.0/hapi'
dataset = 'dataset1'
start = '1970-01-01'
stop = '1970-01-01T00:00:03'

def test_cache_short():

# Compare read with empty cache with read with hot cache and usecache=True

opts = {**kwargs, 'cache': True}

opts['usecache'] = False
shutil.rmtree(opts['cachedir'], ignore_errors=True)
data, _ = hapi(server, dataset, 'scalarint,vectorint', start, stop, **opts)

opts['usecache'] = True
data2, _ = hapi(server, dataset, 'scalarint,vectorint', start, stop, **opts)

assert compare.equal(data, data2)


def test_cache_error():

from unittest.mock import patch

import io
import contextlib
import pathlib
import tempfile
from hapiclient.util import write_atomic

def assert_warns(fn, expected):
buf = io.StringIO()
with contextlib.redirect_stderr(buf):
result = fn()
print(buf.getvalue())
msg = f"Expected '{expected}' in stderr: {buf.getvalue()!r}"
assert expected in buf.getvalue(), msg
return result

# Direct calls to write_atomic()
with tempfile.TemporaryDirectory() as tmpdir:
path = pathlib.Path(tmpdir) / 'test.json'
data = {'key': 'value'}

def call_write_atomic():
write_atomic(str(path), data)

# Simulate write failure
with patch('json.dump', side_effect=OSError('No space left on device')):
assert_warns(call_write_atomic, 'Failed to write cache file')
assert not path.exists(), 'File should not exist after write failure'

# Simulate os.replace failure after successful write
with patch('os.replace', side_effect=OSError('Permission denied')):
assert_warns(call_write_atomic, 'Failed to rename cache file from')
assert not path.exists(), 'File should not exist after rename failure'

# Simulate write failure AND unlink failure
patch1 = patch('json.dump', side_effect=OSError('No space left on device'))
patch2 = patch('pathlib.Path.unlink', side_effect=OSError('Permission denied'))
with patch1, patch2:
assert_warns(call_write_atomic, 'Failed to remove temporary cache file')

# Simulate successful write
write_atomic(str(path), data)
assert path.exists(), 'File should exist after successful write'


# Indirect calls to write_atomic()
dataset = 'dataset1'
start = '1970-01-01'
stop = '1970-01-01T00:00:03'

opts = {**kwargs, 'cache': True, 'usecache': False}

def call_hapi():
data, meta = hapi(server, dataset, 'scalarint,vectorint', start, stop, **opts)
return data, meta

def assert_data_valid(data):
msg = hapi(server, dataset, 'scalarint,vectorint', start, stop, **opts)
assert data['Time'][0] == b'1970-01-01T00:00:00.000Z', msg

with patch('pickle.dump', side_effect=OSError('No space left on device')):
data, _ = assert_warns(call_hapi, 'Failed to write cache file')
assert_data_valid(data)

with patch('os.replace', side_effect=OSError('Permission denied')):
data, _ = assert_warns(call_hapi, 'Failed to rename cache file from')
assert_data_valid(data)

p1 = patch('pickle.dump', side_effect=OSError('No space left on device'))
p2 = patch('pathlib.Path.unlink', side_effect=OSError('Permission denied'))
with p1, p2:
data, _ = assert_warns(call_hapi, 'Failed to remove temporary cache file')
assert_data_valid(data)


if __name__ == '__main__':
test_cache_short()
test_cache_error()
21 changes: 1 addition & 20 deletions test/test_hapi_data_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,6 @@ def test_server(version):
pytest.fail("test_server('%s') raised: %s" % (version, e))


def test_cache_short():

# Compare read with empty cache with read with hot cache and usecache=True
dataset = 'dataset1'
start = '1970-01-01'
stop = '1970-01-01T00:00:03'

opts = {**kwargs, 'cache': True}

opts['usecache'] = False
shutil.rmtree(opts['cachedir'], ignore_errors=True)
data, meta = hapi(server, dataset, 'scalarint,vectorint', start, stop, **opts)

opts['usecache'] = True
data2, meta2 = hapi(server, dataset, 'scalarint,vectorint', start, stop, **opts)

assert compare.equal(data, data2)


def test_subset_short():

dataset = 'dataset1'
Expand Down Expand Up @@ -212,6 +193,7 @@ def test_unicode():
assert compare.read(server, dataset, parameter, run, opts.copy(), logger=logger)
assert compare.cache(server, dataset, parameter, opts.copy(), logger=logger)


def test_empty_response():
from hapiclient import hapi

Expand Down Expand Up @@ -249,6 +231,5 @@ def test_empty_response():
test_reader_timing_long()
test_all_test_servers()
test_subset_short()
test_cache_short()
test_request2path()
test_unicode()
Loading