Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 97 additions & 5 deletions mgmtworker/cloudify_system_workflows/snapshots/snapshot_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import pathlib
import queue
import shutil
import hashlib
import tarfile
import tempfile
import zipfile
from collections import defaultdict
from contextlib import contextmanager
from pathlib import Path
from typing import Any

Expand Down Expand Up @@ -84,6 +87,11 @@ def __init__(
self._auditlog_queue = queue.Queue()
self._auditlog_listener = AuditLogListener(self._client,
self._auditlog_queue)
self._written_archives: dict[str, dict[tuple[str, ...], str]]
self._written_archives = { # track created entities archives
'plugins': {},
'blueprints': {}, # will do for both blueprints and blueprint_revisions
}

def create(self, timeout: float | None = None):
"""Dumps manager's data and some metadata into a single zip file"""
Expand Down Expand Up @@ -263,6 +271,7 @@ def _write_files(
if _should_append_entity(dump_type, entity):
self._auditlog_listener.append_entity(
tenant_name, dump_type, entity)
self._update_written_archives(entity_id, dump_type, output_dir)
# Dump the data as JSON files
filenum = _get_max_filenum_in_dir(output_dir) or 0
for (source, source_id), items in data_buckets.items():
Expand Down Expand Up @@ -308,17 +317,24 @@ def _create_archive(self):
) as zf:
base_dir = os.path.join(root_dir, os.curdir)
base_dir = os.path.normpath(base_dir)
for dirpath, dirnames, filenames in os.walk(base_dir):
for dirpath, dirnames, filenames in os.walk(base_dir, followlinks=False):
root_path = Path(dirpath)
arcdirpath = os.path.relpath(dirpath, root_dir)
for name in sorted(dirnames):
path = os.path.join(dirpath, name)
arcname = os.path.join(arcdirpath, name)
zf.write(path, arcname)
for name in filenames:
path = os.path.join(dirpath, name)
path = os.path.normpath(path)
if os.path.isfile(path):
arcname = os.path.join(arcdirpath, name)
path = root_path / name
arcname = path.relative_to(root_dir)
if path.is_symlink():
zip_info = zipfile.ZipInfo(str(arcname))
zip_info.create_system = 3 # Unix
st = os.lstat(path)
zip_info.external_attr = st.st_mode << 16
link_target = os.readlink(path)
zf.writestr(zip_info, link_target)
elif os.path.isfile(path):
zf.write(path, arcname)

def _upload_archive(self):
Expand Down Expand Up @@ -392,6 +408,26 @@ def _update_snapshot_status(self, status, error=None):
error=error
)

def _update_written_archives(self, entity_id, dump_type, output_dir):
dest_dir = (output_dir / f'{dump_type}').resolve()
suffix = {
'plugins': '.zip',
'blueprints': '.tar.gz',
}.get(dump_type)
if not suffix:
return
entity_archive = dest_dir / f'{entity_id}{suffix}'
content_hashes = _get_archive_content_hashes(entity_archive)
if existing_path := self._written_archives[dump_type].get(content_hashes):
entity_archive.unlink(missing_ok=False)
os.symlink(
os.path.relpath(existing_path, entity_archive).split("/", 1)[-1],
entity_archive,
)
ctx.logger.debug("Created symlink: %s to %s", entity_archive, existing_path)
return
self._written_archives[dump_type][content_hashes] = entity_archive


def _prepare_temp_dir() -> Path:
"""Prepare temporary (working) directory structure"""
Expand Down Expand Up @@ -516,3 +552,59 @@ def get_all(method, kwargs=None):
kwargs['_offset'] = len(data)

return data


def _hash_it(content) -> str:
if isinstance(content, str):
content = content.encode('utf-8')
elif not isinstance(content, bytes):
content = str(content).encode('utf-8')
return hashlib.md5(content).hexdigest()


@contextmanager
def _open_archive(path: Path):
if path.name.endswith(".zip"):
with zipfile.ZipFile(path, "r") as arc:
yield "zip", arc
elif path.name.endswith(".tar.gz"):
with tarfile.open(path, "r:gz") as arc:
yield "tar.gz", arc
else:
raise RuntimeWarning("not supported archive type '{}'".format(path))


def _iter_archive_members(arc_type: str, archive: zipfile.ZipFile | tarfile.TarFile):
if arc_type == "zip":
for info in archive.infolist():
yield info.filename, info.is_dir(), lambda i=info: archive.open(i)
else:
for member in archive.getmembers():
yield member.name, member.isdir(), lambda m=member: archive.extractfile(m)


def _get_archive_content_hashes(path: Path) -> tuple[str, ...]:
hashes: set[str] = set()
all_dirs: set[str] = set()
not_empty_dirs: set[str] = set()
filenames: set[str] = set()

with _open_archive(path) as (arc_type, arc):
for name, is_dir, open_file in _iter_archive_members(arc_type, arc):
if not is_dir:
filenames.add(name)
parts = name.split('/')
for i in range(1, len(parts)):
not_empty_dirs.add('/'.join(parts[:i]))
with open_file() as fileobj:
if fileobj:
content_hash = _hash_it(fileobj.read())
hashes.add(content_hash)
else:
all_dirs.add(name.rstrip("/"))

if filenames:
hashes.add(_hash_it(":".join(filenames)))
if empty_dirs := all_dirs - not_empty_dirs:
hashes.add(_hash_it(":".join(empty_dirs)))
return tuple(hashes)
20 changes: 17 additions & 3 deletions mgmtworker/cloudify_system_workflows/snapshots/snapshot_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import uuid
import base64
import shutil
import stat
import zipfile
import tempfile
import threading
import subprocess
from contextlib import contextmanager
from functools import partial
from pathlib import Path
from typing import Any

from cloudify.workflows import ctx
Expand Down Expand Up @@ -76,6 +78,11 @@

# Reproduced/modified from patch for https://bugs.python.org/issue15795
class ZipFile(zipfile.ZipFile):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._all_entries = {info.filename.rstrip('/'): info for info in self.infolist()}

def _extract_member(self, member, targetpath, pwd):
"""Extract the ZipInfo object 'member' to a physical
file on the path targetpath.
Expand Down Expand Up @@ -112,11 +119,18 @@ def _extract_member(self, member, targetpath, pwd):
os.mkdir(targetpath)
return targetpath

with self.open(member, pwd=pwd) as source, \
open(targetpath, "wb") as target:
_mode = member.external_attr >> 16
if stat.S_ISLNK(_mode):
link = self.read(member.filename).decode('utf-8')
source_path = Path(member.filename).parent / link
member_to_extract = self._all_entries[os.path.normpath(source_path)]
else:
member_to_extract = member

with self.open(member_to_extract, pwd=pwd) as source, open(targetpath, "wb") as target:
shutil.copyfileobj(source, target)

mode = member.external_attr >> 16 & 0xFFF
mode = member_to_extract.external_attr >> 16 & 0xFFF
os.chmod(targetpath, mode)
return targetpath

Expand Down