Overview: An autonomous Reinforcement Learning (PPO) trading agent designed to operate profitably in "extreme survival" conditions: low capital, high spreads, and high commissions. Built with Python, PyTorch, and the MetaTrader 5 API.
The system is built to survive in a $100 USD micro-account trading EURUSD. In this environment, the broker's commission and spread consume up to 80% of the first pip of profit.
The model is optimized to fully utilize a Ryzen 7 5800x for parallel processing (vectorized environments) and an NVIDIA RTX 3080Ti for accelerated training.
torch.cuda.amp to optimize VRAM usage and accelerate Transformer training.The PPO reward function is strictly penalized to discourage overtrading:
Here is a sample of the core integration logic:
import MetaTrader5 as mt5
import pandas as pd
from datetime import datetime
import pytz
class MT5DataLoader:
def __init__(self, symbol="EURUSD", login=None, password=None, server=None):
self.symbol = symbol
# Inicializar conexión
if not mt5.initialize():
print("Error al inicializar MT5, asegúrate de que la terminal esté abierta o configura login/pass")
quit()
else:
print(f"✅ Conectado a MT5. Buscando datos para: {symbol}")
def get_data(self, timeframe, n_bars=10000):
"""Descarga datos crudos de MT5"""
rates = mt5.copy_rates_from_pos(self.symbol, timeframe, 0, n_bars)
if rates is None:
print(f"❌ Error descargando datos para timeframe: {timeframe}")
return None
df = pd.DataFrame(rates)
df['time'] = pd.to_datetime(df['time'], unit='s')
df.set_index('time', inplace=True)
return df[['open', 'high', 'low', 'close', 'tick_volume']]
def get_multi_timeframe_data(self, n_bars=50000):
"""
Descarga M1, M5, M15, M30, H1 y los alinea.
La base es M1. Los datos mayores se repiten (ffill) para llenar los huecos de M1.
"""
timeframes = {
'M1': mt5.TIMEFRAME_M1,
'M5': mt5.TIMEFRAME_M5,
'M15': mt5.TIMEFRAME_M15,
'M30': mt5.TIMEFRAME_M30,
'H1': mt5.TIMEFRAME_H1
}
data_dict = {}
print("⏳ Descargando datos masivos (esto puede tardar unos segundos)...")
# Descargar base M1
df_m1 = self.get_data(timeframes['M1'], n_bars)
if df_m1 is None: return None
data_dict['M1'] = df_m1
# Descargar y alinear el resto
for tf_name, tf_code in timeframes.items():
if tf_name == 'M1': continue
# Descargamos menos barras para los TF grandes, pero suficiente para cubrir el tiempo
df_tf = self.get_data(tf_code, n_bars=n_bars//5) # Ajuste aproximado
# Renombrar columnas para evitar colisiones
df_tf = df_tf.add_suffix(f'_{tf_name}')
# Unir con M1 usando 'asof' (el valor más reciente conocido)
# Esto alinea, por ejemplo, el H1 actual a todas las velas M1 que ocurren dentro de esa hora
data_dict[tf_name] = df_tf
print("✅ Descarga completada.")
return data_dict