Skip to content

Instantly share code, notes, and snippets.

@jev-odoo
Last active April 15, 2020 12:30
Show Gist options
  • Select an option

  • Save jev-odoo/e529ee4477ee450fd0908c1357a122bc to your computer and use it in GitHub Desktop.

Select an option

Save jev-odoo/e529ee4477ee450fd0908c1357a122bc to your computer and use it in GitHub Desktop.
POS - Split
# FOR VERSION 12 and 12.3
# Set sessions ids between the brackets, separated by a coma
# I.E. SESSION_ID = [12, 18, 132] or SESSION_ID = [4]
SESSION_IDS = []
MODE = 'check'
SPLIT_KEYWORD = 'Split'
MAX_ORDER_PER_SESSION = 200
# DO NOT MODIFY
VALID_VERSIONS = (11, 12,)
VALID_SESSION_STATES = ('opened', 'closing_control')
VALID_MODES = ('check', 'process')
LOG_LINE_ACTION_NAME = 'POS - Split Huge Session'
LOG_LINE_NAME = 'Support Intervention'
ON_SUCCESS_LOG_NOTE = 'This session had too many orders and has been split by Odoo Support.'
SAVE_LOGS = True # Save logs in ir.logging table accessible from technical menu
RAISE_LOGS = True # Show logs in a popup after the process
PARTIAL_LOGS = []
FULL_LOGS = []
INFO = 'Info'
WARNING = 'Warning'
ERROR = 'Error'
# -----===== HELPERS =====------
def _add_log_line(level, log_message, log_details: list = None):
if log_details:
details = []
for detail in log_details:
for key, value in detail.items():
if key == 'counter':
details.append('{}/{}'.format(value[0], value[1]))
else:
details.append('{}: {}'.format(key.title(), value))
log_message = '{} ({})'.format(log_message, ' - '.join(details))
PARTIAL_LOGS.append((log_message, level))
def check_mode():
if MODE not in VALID_MODES:
raise Warning('Invalid mode. (Actual mode: {}, Valid modes: {})'.format(MODE, iter_to_string(VALID_MODES)))
def check_version(env, versions: float = None):
latest_version = env['ir.module.module'].search([('name', '=', 'base')]).latest_version.replace('~', '-')
actual_version = 0
versions_to_check = [versions] if versions else VALID_VERSIONS
for version in versions_to_check:
if latest_version.startswith(str(version)) or latest_version.startswith('saas-%s' % str(version)):
actual_version = version
if not versions and not actual_version:
raise Warning('Invalid version. (Actual version: {} - Valid versions: {})'.format(latest_version, iter_to_string(VALID_VERSIONS)))
return actual_version
def commit(env, session):
if MODE == 'process':
commit_message = 'commiting'
post_note(env, session)
env.cr.commit()
else:
commit_message = 'rollbacking'
env.cr.rollback()
log_message = '{} mode detected, {} changes.'.format(MODE, commit_message)
log_details = [{'session_id': session.id}]
_add_log_line(INFO, log_message, log_details)
def format_logs(logs):
lines = ['{} - {}'.format(x[1], x[0]) for x in logs]
return '\n'.join(lines)
def get_session(env, additional_domains: list = None, raise_exception=True):
sessions = env['pos.session']
domain = []
if SESSION_IDS:
domain.append(('id', 'in', SESSION_IDS))
else:
if raise_exception:
raise Warning('Please define the session id.')
if additional_domains:
domain += additional_domains
sessions = sessions.search(domain, order='id ASC')
if not sessions:
raise Warning('No session found. ids: {}'.format(iter_to_string(SESSION_IDS)))
return sessions
def iter_to_string(iterable):
return ', '.join(str(x) for x in iterable)
def post_note(env, session):
if not check_version(env, 12.0) and not check_version(env, 11.0) and ON_SUCCESS_LOG_NOTE:
message = '{}'.format(ON_SUCCESS_LOG_NOTE)
session.message_post(body=message)
def raise_logs():
if RAISE_LOGS:
lines = format_logs(FULL_LOGS)
raise Warning(lines)
def save_log_lines(env):
for line in PARTIAL_LOGS:
FULL_LOGS.append(line)
if SAVE_LOGS:
lines = format_logs(PARTIAL_LOGS)
data = ({
'create_date': datetime.datetime.now(),
'create_uid': env.uid,
'type': 'server',
'dbname': env.cr.dbname,
'name': LOG_LINE_NAME,
'level': 'info',
'message': lines,
'path': 'action',
'line': 0,
'func': LOG_LINE_ACTION_NAME,
})
env['ir.logging'].create(data)
env.cr.commit()
PARTIAL_LOGS.clear()
# -----===== SPECIFIC FUNCTIONS =====------
def specific_function(env, session):
number_of_orders = len(session.order_ids)
number_of_split_session = (1 + number_of_orders // MAX_ORDER_PER_SESSION) if number_of_orders else 0
start_at = datetime.datetime.utcnow()
try:
for i in range(number_of_split_session):
# Create new session
orders = env['pos.order'].sudo().search([('session_id', '=', session.id)], limit=MAX_ORDER_PER_SESSION, order='id ASC')
session_values = {
'name': '- {} {}/{} for session {}'.format(SPLIT_KEYWORD, i + 1, number_of_split_session, session.name),
'config_id': session.config_id.id,
'user_id': session.user_id.id,
'start_at': start_at,
'state': 'opened',
'rescue': True,
}
split_session = session.sudo().create(session_values)
orders.sudo().write({'session_id': split_session.id})
# Create new bank_statements and associate according bank_statement_lines for the newly created session
statements = env['account.bank.statement']
for statement in session.statement_ids:
statement_lines = env['account.bank.statement.line']
for sl in orders.mapped('statement_ids'):
if sl.statement_id.id == statement.id:
statement_lines |= sl
if statement_lines:
statement_value = {
'name': split_session.name,
'journal_id': statement.journal_id.id,
'line_ids': [(6, False, statement_lines.ids)]
}
# Use sudo in case the user has no rights to create or write on account.bank.statement or account.bank.statement.line
new_statement = session.statement_ids.sudo().create(statement_value)
statements += new_statement
new_statement.sudo().write({
'balance_end_real': new_statement.balance_end
})
statement.sudo().write({
'balance_end_real': statement.balance_end_real - new_statement.balance_end
})
split_session.sudo().write({
'statement_ids': [(6, False, statements.ids)]
})
log_message = 'Splitted session successfully created.'
log_details = [{'session_id': split_session.id},
{'counter': (i + 1, number_of_split_session)}]
_add_log_line(INFO, log_message, log_details)
session.action_pos_session_closing_control()
log_message = 'Session successfully closed.'
log_details = [{'session_id': session.id}]
_add_log_line(INFO, log_message, log_details)
return True
except Exception as e:
log_message = 'Error splitting session.'
log_details = [{'session_id': session.id},
{'error': str(e)}]
_add_log_line(ERROR, log_message, log_details)
return
# -----===== MAIN PROCESS =====------
def process(env):
check_mode()
check_version(env)
additional_domain = []
sessions = get_session(env, additional_domain)
for session in sessions:
if session.state not in VALID_SESSION_STATES:
log_message = 'Invalid session state, skipping.'
log_details = [{'session_id': session.id},
{'actual state': session.state},
{'valid states': iter_to_string(VALID_SESSION_STATES)}]
_add_log_line(WARNING, log_message, log_details)
continue
try:
if specific_function(env, session):
commit(env, session)
else:
env.cr.rollback()
except Exception as e:
log_message = 'Unexpected Error occurred.'
log_details = [{'session_id': session.id},
{'error': str(e)}]
_add_log_line(ERROR, log_message, log_details)
env.cr.rollback()
save_log_lines(env)
raise_logs()
process(env)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment