beancount-cde-importer/beancount_cde_importer/__init__.py

132 lines
4.3 KiB
Python

import csv
import re
from datetime import datetime, date
from itertools import islice
from typing import Any, Optional
from beancount.core import data # type: ignore
from beancount.core.amount import Amount # type: ignore
from beancount.core.number import Decimal # type: ignore
from beancount.ingest import cache, importer # type: ignore
COL_DATE = "Date de comptabilisation"
COL_LABEL = "Libelle simplifie"
COL_DEBIT = "Debit"
COL_CREDIT = "Credit"
COL_DETAIL = "Informations complementaires"
COL_DATE_OP = "Date operation"
COL_CATEGORY = "Categorie"
COL_SUBCATEGORY = "Sous categorie"
END_DATE_REGEX = "Date de fin de téléchargement : ([0-3][0-9]/[0-1][0-9]/[0-9]{4})"
START_DATE_REGEX = "Date de début de téléchargement : ([0-3][0-9]/[0-1][0-9]/[0-9]{4})"
EXPECTED_HEADER = "Date de comptabilisation;Libelle simplifie;Libelle operation;Reference;Informations complementaires;Type operation;Categorie;Sous categorie;Debit;Credit;Date operation;Date de valeur;Pointage operation"
def is_valid_header(header: str) -> bool:
return header == EXPECTED_HEADER
class CDEImporter(importer.ImporterProtocol):
def __init__(self, account: str):
self.account = account
def identify(self, file: cache._FileMemo) -> bool:
try:
# NOTE: beancount.ingest.cache._FileMemo handles automatic encoding
# detection
lines: list[str] = file.head().splitlines()
header: str = lines[0]
return is_valid_header(header)
except:
return False
def file_account(self, file: cache._FileMemo) -> Optional[str]:
return self.account
def file_name(self, file: cache._FileMemo) -> Optional[str]:
return "CaisseEpargne_Statement.csv"
def file_date(self, file: cache._FileMemo) -> Optional[date]:
lines: list[str] = file.contents().splitlines()
csv_reader: csv.DictReader = csv.DictReader(
lines, delimiter=";", strict=True, quoting=csv.QUOTE_NONE
)
row: Optional[dict[str, str]] = next(csv_reader)
if row is None:
return None
return datetime.strptime(row[COL_DATE], "%d/%m/%Y").date()
def extract(self, file: cache._FileMemo, existing_entries=None) -> list[Any]:
directives: list[Any] = []
lines: list[str] = file.contents().splitlines()
csv_reader = csv.DictReader(
lines, delimiter=";", strict=True, quoting=csv.QUOTE_NONE
)
for index, row in enumerate(csv_reader):
lineno: int = index + 2 # entries start at line 2
meta = data.new_metadata(file.name, lineno)
transaction_date: date = datetime.strptime(row[COL_DATE], "%d/%m/%Y").date()
op_date: date = datetime.strptime(row[COL_DATE_OP], "%d/%m/%Y").date()
category: str = row[COL_CATEGORY]
subcategory: str = row[COL_SUBCATEGORY]
label: str = row[COL_LABEL]
debit: str = row[COL_DEBIT]
credit: str = row[COL_CREDIT]
detail: str = row[COL_DETAIL] if row[COL_DETAIL] else ""
postings: list[data.Posting] = []
if debit:
amount = Decimal(debit.replace(",", "."))
postings.append(
data.Posting(
self.account,
Amount(amount, "EUR"),
None,
None,
None,
None,
)
)
if credit:
amount = Decimal(credit.replace(",", "."))
postings.append(
data.Posting(
self.account,
Amount(amount, "EUR"),
None,
None,
None,
None,
)
)
meta["op_date"] = op_date
meta["ce_category"] = f"{category} - {subcategory}"
directives.append(
data.Transaction(
meta,
transaction_date,
self.FLAG,
label,
detail,
data.EMPTY_SET,
data.EMPTY_SET,
postings,
)
)
return directives