import csv import re from datetime import datetime, date from itertools import islice from typing import Any, Optional from beancount.core import data # type: ignore from beancount.core.amount import Amount # type: ignore from beancount.core.number import Decimal # type: ignore from beancount.ingest import cache, importer # type: ignore COL_DATE = "Date de comptabilisation" COL_LABEL = "Libelle simplifie" COL_DETAILED_LABEL = "Libelle operation" COL_DEBIT = "Debit" COL_CREDIT = "Credit" COL_DETAIL = "Informations complementaires" COL_DATE_OP = "Date operation" COL_CATEGORY = "Categorie" COL_SUBCATEGORY = "Sous categorie" END_DATE_REGEX = "Date de fin de téléchargement : ([0-3][0-9]/[0-1][0-9]/[0-9]{4})" START_DATE_REGEX = "Date de début de téléchargement : ([0-3][0-9]/[0-1][0-9]/[0-9]{4})" EXPECTED_HEADER = "Date de comptabilisation;Libelle simplifie;Libelle operation;Reference;Informations complementaires;Type operation;Categorie;Sous categorie;Debit;Credit;Date operation;Date de valeur;Pointage operation" def is_valid_header(header: str) -> bool: return header == EXPECTED_HEADER class CDEImporter(importer.ImporterProtocol): def __init__(self, account: str): self.account = account def identify(self, file: cache._FileMemo) -> bool: try: # NOTE: beancount.ingest.cache._FileMemo handles automatic encoding # detection lines: list[str] = file.head().splitlines() header: str = lines[0] return is_valid_header(header) except: return False def file_account(self, file: cache._FileMemo) -> Optional[str]: return self.account def file_name(self, file: cache._FileMemo) -> Optional[str]: return "CaisseEpargne_Statement.csv" def file_date(self, file: cache._FileMemo) -> Optional[date]: lines: list[str] = file.contents().splitlines() csv_reader: csv.DictReader = csv.DictReader( lines, delimiter=";", strict=True, quoting=csv.QUOTE_NONE ) row: Optional[dict[str, str]] = next(csv_reader) if row is None: return None return datetime.strptime(row[COL_DATE], "%d/%m/%Y").date() def extract(self, file: cache._FileMemo, existing_entries=None) -> list[Any]: directives: list[Any] = [] lines: list[str] = file.contents().splitlines() csv_reader = csv.DictReader( lines, delimiter=";", strict=True, quoting=csv.QUOTE_NONE ) for index, row in enumerate(csv_reader): lineno: int = index + 2 # entries start at line 2 meta = data.new_metadata(file.name, lineno) transaction_date: date = datetime.strptime(row[COL_DATE], "%d/%m/%Y").date() op_date: date = datetime.strptime(row[COL_DATE_OP], "%d/%m/%Y").date() category: str = row[COL_CATEGORY] subcategory: str = row[COL_SUBCATEGORY] label: str = row[COL_LABEL] detailed_label: str = row[COL_DETAILED_LABEL] debit: str = row[COL_DEBIT] credit: str = row[COL_CREDIT] detail: str = row[COL_DETAIL] if row[COL_DETAIL] else "" postings: list[data.Posting] = [] if debit: amount = Decimal(debit.replace(",", ".")) postings.append( data.Posting( self.account, Amount(amount, "EUR"), None, None, None, None, ) ) if credit: amount = Decimal(credit.replace(",", ".")) postings.append( data.Posting( self.account, Amount(amount, "EUR"), None, None, None, None, ) ) meta["op_date"] = op_date meta["ce_category"] = f"{category} - {subcategory}" meta["detailed_label"] = detailed_label directives.append( data.Transaction( meta, transaction_date, self.FLAG, label, detail, data.EMPTY_SET, data.EMPTY_SET, postings, ) ) return directives