Hello from Spain! We’re not developers, we’re in charge of systems and infrastructure, and we’ve only recently started using Python and Ansible.
Our goal is related to cybersecurity requirements from ISO27001 and DORA. These standards are very strict when it comes to backups — they require verification and evidence of successful restoration to demonstrate resilience.
To achieve this, we use Ansible to copy the “dockers” and “volumes” folders (persistent container data) to an encrypted NAS. We generate three log files: one for the backup, one for the restoration, and one as evidence. The evidence file is the result of accessing the restored machine and confirming that the containers respond properly.
As required by the standard, we store these files in Nextcloud. This is where Python comes into play:
In a Nextcloud Tables registry, we want to automatically add every day the links to the backup, restoration, and evidence files. If we write the internal file URLs directly in text fields, everything works fine, but we’d like to make it cleaner by using the “link” cell type: display the filename as the text, and attach the file URL to it.
Just to make it clear we’re not professional programmers. We looked into how to use the Tables API to create new records with these fields. It works when the columns are of type “text,” but when they’re of type “link,” the URL gets truncated and doesn’t work properly.
Sorry for our English it’s not our native language, but we do our best! Thanks a lot for your help and for taking the time to look at this.
Below you’ll find all our code.
code
‘’’
#!/usr/bin/env python3
import requests
import argparse
import datetime
import json
import logging
import sys
import xml.etree.ElementTree as ET
import re
import time
import random
import urllib.parse
import os
— CONFIGURATION —
NC_BASE_URL = “https://nextcloud.example.com”
TABLES_API_BASE = f"{NC_BASE_URL}/apps/tables/api/1/tables"
WEBDAV_BASE_URL = f"{NC_BASE_URL}/remote.php/dav/files"
NS = {‘d’: ‘DAV:’, ‘oc’: ‘http://owncloud.org/ns’}
— CHANGE MADE HERE —
Added mapping for the daily backup table (ID 12)
FOLDER_TABLE_MAP = {
“carpeta_logs_de_automatizacion”: “7”,
“copias_completas”: “8”,
“copias_diarias”: “12”
}
LOG_PATTERN = re.compile(r’^(?P<base_full>\d{3}(?P\d{4}-\d{2}-\d{2})\d{2}-\d{2}).(?Pback|rest|evi).md$')
logging.basicConfig(level=logging.INFO, format=‘%(asctime)s - %(levelname)s - %(message)s’)
Backoff utilities
def sleep_backoff(attempt, base=1, cap=30):
“”“Exponential backoff with jitter.”“”
delay = min(cap, base * (2 ** (attempt - 1)))
jitter = random.random()
return delay + jitter
================================================================
— CHANGE: URL shortening —
================================================================
def shorten_url_tinyurl(long_url):
“”“Shorten a URL using the public TinyURL API.”“”
if not long_url:
return long_url
try:
api_url = f"http://tinyurl.com/api-create.php?url={urllib.parse.quote(long_url)}"
response = requests.get(api_url, timeout=10)
if response.status_code == 200 and response.text.startswith(“http”):
return response.text.strip()
else:
logging.warning(f"Could not shorten URL ({response.status_code}), using original.“)
return long_url
except Exception as e:
logging.warning(f"Error shortening URL: {e}”)
return long_url
================================================================
HELPER FUNCTIONS
================================================================
def get_auth_headers(nc_user, nc_credential, auth_method):
if auth_method not in [‘basic’, ‘token’]:
raise ValueError(f"Invalid authentication method: {auth_method}")
auth = (nc_user, nc_credential)
return {
“auth”: auth,
“headers”: {“OCS-APIRequest”: “true”, “Content-Type”: “application/json”},
“verify”: True,
“timeout”: 20
}
def get_column_mapping(table_id, auth_config):
try:
auth = auth_config[‘auth’]
headers = auth_config[‘headers’]
response = requests.get(f"{TABLES_API_BASE}/{table_id}/columns", headers=headers, auth=auth,
verify=auth_config[‘verify’], timeout=auth_config[‘timeout’])
if response.status_code == 200:
columns = response.json()
mapping = {col[‘title’].strip(): str(col[‘id’]) for col in columns}
print(f"\nCOLUMN MAPPING FOUND FOR TABLE {table_id}:“)
for name, col_id in mapping.items():
print(f” ‘{name}’ → ID: {col_id}“)
return mapping
else:
print(f"Error getting columns: {response.status_code} - {response.text}”)
return None
except Exception as e:
logging.error(f"Error connecting to Nextcloud: {e}")
return None
def diagnose_table(nc_user, nc_credential, auth_method, table_id):
try:
auth_config = get_auth_headers(nc_user, nc_credential, auth_method)
print(f"\nTABLE DIAGNOSTIC {table_id}")
print("=" * 50)
response = requests.get(f"{TABLES_API_BASE}/{table_id}", **auth_config)
if response.status_code == 200:
table_info = response.json()
print(f"Table accessible: {table_info.get('title', 'Untitled')}")
else:
print(f"Cannot access table: {response.status_code}")
return False
mapping = get_column_mapping(table_id, auth_config)
if not mapping:
print("Could not retrieve columns")
return False
required_columns = ["Fecha", "Link Backup", "Link Restore", "Link evidencia", "NombreFichero"]
for col in required_columns:
if col not in mapping:
print(f"Missing column: '{col}'")
print(f" Available columns: {list(mapping.keys())}")
else:
print(f"Found column: '{col}' -> ID: {mapping[col]}")
return True
except Exception as e:
print(f"Error during diagnostic: {e}")
return False
def register_backup(nc_user, nc_credential, auth_method, table_id, link_backup, link_restore, link_evidence, file_name, times):
“”"
Registers a row in the Nextcloud Table. ‘times’ must be a dict with
keys ‘duracion_copias’, ‘duracion_restore’, ‘duracion_total’.
“”"
try:
auth_config = get_auth_headers(nc_user, nc_credential, auth_method)
except ValueError as e:
logging.error(str(e))
return False
Shorten links before registering
link_backup = shorten_url_tinyurl(link_backup)
link_restore = shorten_url_tinyurl(link_restore)
link_evidence = shorten_url_tinyurl(link_evidence)
mapping = get_column_mapping(table_id, auth_config)
if not mapping:
return False
current_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
payload_data = {
mapping.get("Fecha"): current_date,
mapping.get("Link Backup"): link_backup,
mapping.get("Link Restore"): link_restore,
mapping.get("Link evidencia"): link_evidence,
mapping.get("NombreFichero"): file_name,
mapping.get("Tiempo Backup Segundos"): times.get("duracion_copias") if times else None,
mapping.get("Tiempo Restore Segundos"): times.get("duracion_restore") if times else None,
mapping.get("Tiempo total Segundos"): times.get("duracion_total") if times else None,
}
payload_data_clean = {k: v for k, v in payload_data.items() if k and v is not None and v != ""}
payload = {"data": payload_data_clean}
print(f"\nSENDING DATA TO TABLE {table_id} FOR {file_name}:")
print("=" * 60)
print(json.dumps(payload, indent=2, ensure_ascii=False))
print("=" * 60)
try:
response = requests.post(f"{TABLES_API_BASE}/{table_id}/rows", json=payload, **auth_config)
if response.status_code in [200, 201]:
logging.info(f"Backup successfully registered: {file_name}")
return True
else:
logging.error(f"Error registering backup ({response.status_code}): {response.text}")
return False
except Exception as e:
logging.error(f"Connection error: {e}")
return False
def list_webdav_files(nc_user, nc_credential, nc_internal_path, auth_method):
try:
auth_config = get_auth_headers(nc_user, nc_credential, auth_method)
auth = auth_config[‘auth’]
headers = auth_config[‘headers’].copy()
headers.pop(‘Content-Type’, None)
if nc_internal_path.startswith(f'/{nc_user}/'):
webdav_path = nc_internal_path
elif nc_internal_path.startswith('/'):
webdav_path = f'/{nc_user}{nc_internal_path}'
else:
webdav_path = f'/{nc_user}/{nc_internal_path}'
full_url = f"{WEBDAV_BASE_URL}{webdav_path.rstrip('/')}/"
propfind_body = """<?xml version="1.0" encoding="utf-8" ?>
<d:propfind xmlns:d="DAV:">
<d:prop>
<d:getlastmodified/>
<d:getcontentlength/>
<d:resourcetype/>
</d:prop>
</d:propfind>"""
headers['Depth'] = '1'
response = requests.request('PROPFIND', full_url, data=propfind_body, headers=headers,
auth=auth, verify=auth_config['verify'], timeout=auth_config['timeout'])
if response.status_code not in [200, 207]:
logging.error(f"Failed to list WebDAV files. Code: {response.status_code}")
return []
root = ET.fromstring(response.text)
file_list = []
for i, resp in enumerate(root.findall('d:response', NS)):
if i == 0:
continue
href = resp.find('d:href', NS).text
name = urllib.parse.unquote(href.rstrip('/').split('/')[-1])
file_list.append({'name': name, 'href': href})
return file_list
except Exception as e:
logging.critical(f"Critical WebDAV error: {e}")
return []
def get_existing_link(nc_user, nc_credential, file_path, auth_method):
normalized_path = urllib.parse.unquote(file_path.lstrip(‘/’))
if normalized_path.startswith(f"{nc_user}/files/“):
normalized_path = normalized_path[len(f”{nc_user}/files/“):]
elif normalized_path.startswith(f”{nc_user}/“):
normalized_path = normalized_path[len(f”{nc_user}/"):]
auth_config = get_auth_headers(nc_user, nc_credential, auth_method)
auth = auth_config['auth']
sharing_url = f"{NC_BASE_URL}/ocs/v2.php/apps/files_sharing/api/v1/shares"
headers = {"OCS-APIRequest": "true"}
resp = requests.get(sharing_url, params={'path': f'/{normalized_path}'}, auth=auth, headers=headers)
if resp.status_code == 200:
try:
root = ET.fromstring(resp.text)
url_el = root.find('.//data/element/url')
if url_el is None:
url_el = root.find('.//data/url')
if url_el is not None and url_el.text:
return url_el.text
except Exception:
pass
return None
def create_public_link(nc_user, nc_credential, file_path, auth_method):
existing = get_existing_link(nc_user, nc_credential, file_path, auth_method)
if existing:
return existing
normalized_path = urllib.parse.unquote(file_path.lstrip('/'))
if normalized_path.startswith(f"{nc_user}/files/"):
normalized_path = normalized_path[len(f"{nc_user}/files/"):]
elif normalized_path.startswith(f"{nc_user}/"):
normalized_path = normalized_path[len(f"{nc_user}/"):]
auth_config = get_auth_headers(nc_user, nc_credential, auth_method)
auth = auth_config['auth']
url = f"{NC_BASE_URL}/ocs/v2.php/apps/files_sharing/api/v1/shares"
headers = {"OCS-APIRequest": "true"}
data = {'path': f'/{normalized_path}', 'shareType': '3', 'permissions': '1'}
r = requests.post(url, data=data, auth=auth, headers=headers)
if r.status_code in [200, 201]:
try:
root = ET.fromstring(r.text)
url_el = root.find('.//url')
if url_el is not None and url_el.text:
return url_el.text
except Exception as e:
logging.error(f"Error parsing XML from link creation: {e}")
else:
logging.error(f"Error creating link for {normalized_path}: {r.status_code} - {r.text}")
return None
================================================================
NEW LOGIC: process all file sets
================================================================
def find_and_create_links(nc_user, nc_credential, nc_directory_path, search_day, auth_method=‘token’):
files = list_webdav_files(nc_user, nc_credential, nc_directory_path, auth_method)
if not files:
logging.warning(“Could not list files”)
return
sets = {}
map_ext_to_key = {'back': 'backup', 'rest': 'restore', 'evi': 'evidence'}
for item in files:
name = item['name']
match = LOG_PATTERN.match(name)
if not match:
continue
if match.group('day') != search_day:
continue
base_full = match.group('base_full')
ext_log = match.group('ext')
log_type = map_ext_to_key.get(ext_log)
if not log_type:
continue
sets.setdefault(base_full, {})[log_type] = item['href']
results = []
for base_full, types in sets.items():
if len(types) < 3:
logging.warning(f"Incomplete set: {base_full}")
continue
links = {}
for tipo, href in types.items():
internal_path = href.replace(f"/remote.php/dav/files/{nc_user}", "")
link = create_public_link(nc_user, nc_credential, internal_path, auth_method)
links[tipo] = link
time.sleep(0.3)
results.append({"file_name": base_full, "links": links})
print(f"Processed set: {base_full}")
return results
================================================================
MAIN
================================================================
if name == “main”:
parser = argparse.ArgumentParser(description=“Register backups in Nextcloud Tables with multiple sets”)
parser.add_argument(“user”)
parser.add_argument(“credential”)
parser.add_argument(“auth_method”, choices=[‘basic’, ‘token’])
parser.add_argument(“–table”, default=“7”)
parser.add_argument(“–process-day”, nargs=2, metavar=(‘PATH’, ‘DAY’), required=True)
args = parser.parse_args()
# ==== READ JSON PROVIDED BY ANSIBLE ====
times_path = "/tiempos/tiempos_backup.json"
times = None
if os.path.exists(times_path):
try:
with open(times_path, "r") as f:
times = json.load(f)
logging.info(f"Read timing JSON from {times_path}: {times}")
except Exception as e:
logging.error(f"Error reading {times_path}: {e}")
times = None
else:
logging.warning(f"Timing file {times_path} not found. Time fields will not be included.")
results = find_and_create_links(args.user, args.credential, args.process_day[0], args.process_day[1], args.auth_method)
if not results:
print("No complete sets found.")
sys.exit(1)
folder = args.process_day[0].strip('/').split('/')[-1]
table_id = FOLDER_TABLE_MAP.get(folder, args.table)
for group in results:
register_backup(args.user, args.credential, args.auth_method, table_id,
group["links"].get("backup", ""),
group["links"].get("restore", ""),
group["links"].get("evidence", ""),
group["file_name"],
times)
print("\nAll sets processed successfully.")
apps-fileview.texmex_20251017.01_p0
mensaje_foro_nc_inglés.txt
Mostrando mensaje_foro_nc_inglés.txt.