implement extraction

This commit is contained in:
Antoine Martin 2022-02-05 19:39:01 +01:00
parent 0f19647ba1
commit 98335d1de7
2 changed files with 124 additions and 2 deletions

1
.gitignore vendored
View file

@ -2,3 +2,4 @@
/.direnv/
/dist/
/result
__pycache__/

View file

@ -1,7 +1,13 @@
import csv
from itertools import islice
from typing import Optional
import re
from datetime import datetime, date
from itertools import islice
from typing import Any, Optional
from beancount.core import data # type: ignore
from beancount.core.amount import Amount # type: ignore
from beancount.core.number import Decimal # type: ignore
from beancount.ingest import cache, importer # type: ignore
@ -12,6 +18,9 @@ INDEX_DEBIT = 3
INDEX_CREDIT = 4
INDEX_DETAIL = 5
END_DATE_REGEX = "Date de fin de téléchargement : ([0-3][0-9]/[0-1][0-9]/[0-9]{4})"
START_DATE_REGEX = "Date de début de téléchargement : ([0-3][0-9]/[0-1][0-9]/[0-9]{4})"
def is_valid_header(header: list[str]) -> bool:
return (
@ -24,7 +33,28 @@ def is_valid_header(header: list[str]) -> bool:
)
def get_date(file: cache._FileMemo, regex: str) -> Optional[date]:
match: Optional[re.Match] = re.search(regex, file.head())
if match is None:
return None
date_str: Optional[str] = match.group(1)
if date_str is None:
return None
return datetime.strptime(date_str, "%d/%m/%Y").date()
def get_end_date(file: cache._FileMemo) -> Optional[date]:
return get_date(file, END_DATE_REGEX)
def get_start_date(file: cache._FileMemo) -> Optional[date]:
return get_date(file, START_DATE_REGEX)
class CaisseDEpargneImporter(importer.ImporterProtocol):
def __init__(self, account: str):
self.account = account
def identify(self, file: cache._FileMemo) -> bool:
try:
# NOTE: beancount.ingest.cache._FileMemo handles automatic encoding
@ -44,3 +74,94 @@ class CaisseDEpargneImporter(importer.ImporterProtocol):
except:
return False
def file_account(self, file: cache._FileMemo) -> Optional[str]:
return self.account
def file_name(self, file: cache._FileMemo) -> Optional[str]:
return "CaisseEpargne_Statement.csv"
def file_date(self, file: cache._FileMemo) -> Optional[date]:
return get_end_date(file)
def extract(self, file: cache._FileMemo, existing_entries=None) -> list[Any]:
directives: list[Any] = []
end_date: Optional[date] = get_end_date(file)
start_date: Optional[date] = get_start_date(file)
if end_date is None or start_date is None:
return directives
lines: list[str] = file.contents().splitlines()
csv_reader = csv.reader(
lines, delimiter=";", strict=True, quoting=csv.QUOTE_NONE
)
# first 3 lines are useless
for _ in range(3):
next(csv_reader)
# 4th line is usually the final balance
row: Optional[list[str]] = next(csv_reader)
if row is None:
return directives
if row[0] == "Solde en fin de période":
meta = data.new_metadata(file.name, 4)
balance = Decimal(row[4].replace(",", "."))
directives.append(
data.Balance(
meta=meta,
date=end_date,
account=self.account,
amount=Amount(balance, "EUR"),
tolerance=None,
diff_amount=None,
)
)
# skip headings
next(csv_reader)
for index, row in enumerate(csv_reader):
lineno: int = index + 6 # entries start at line 6
meta = data.new_metadata(file.name, lineno)
transaction_date: date = datetime.strptime(
row[INDEX_DATE], "%d/%m/%y"
).date()
label: str = row[INDEX_LABEL]
debit: str = row[INDEX_DEBIT]
credit: str = row[INDEX_CREDIT]
postings: list[data.Posting] = []
if debit:
amount = Decimal(debit.replace(",", "."))
postings.append(
data.Posting(
self.account, Amount(amount, "EUR"), None, None, None, None
)
)
if credit:
amount = Decimal(credit.replace(",", "."))
postings.append(
data.Posting(
self.account, Amount(amount, "EUR"), None, None, None, None
)
)
directives.append(
data.Transaction(
meta,
transaction_date,
self.FLAG,
label,
"",
data.EMPTY_SET,
data.EMPTY_SET,
postings,
)
)
return directives