Mage AI example pipeline
You should read this first before begin with this note.
This note is example to create pipeline that read from source database then write to destination database other will write excel file and send email with xls attachment
First, Create New Pipeline.
Config connection io_config.yaml
mysource:
# PostgresSQL
POSTGRES_CONNECT_TIMEOUT: 10
POSTGRES_DBNAME: src_db
POSTGRES_SCHEMA: public
POSTGRES_USER: mageai
POSTGRES_PASSWORD: password
POSTGRES_HOST: YOUR_POSTGRESQL_SOURCE_IP
POSTGRES_PORT: 5432
mydest:
# PostgresSQL
POSTGRES_CONNECT_TIMEOUT: 10
POSTGRES_DBNAME: dest_db
POSTGRES_SCHEMA: public
POSTGRES_USER: mageai
POSTGRES_PASSWORD: password
POSTGRES_HOST: YOUR_POSTGRESQL_DEST_IP
POSTGRES_PORT: 5432
Add xlsxwriter to requirements.txt
xlsxwriter
Then restart Mage AI. If you run on docker, you can restart your docker container.
Next, Create Dataloader to export from PostgresDB to Dataframe.
Create Data loader > Python > PostgreSQL
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_postgres(*args, **kwargs):
"""
Template for loading data from a PostgreSQL database.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#postgresql
"""
query = 'SELECT * FROM companies' # Specify your SQL query here
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'mysource'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Next, Create Data exporter to Get data from Dataframe and export to PostgresDB.
Create Data exporter > Python > PostgreSQL
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> DataFrame:
"""
Template for exporting data to a PostgreSQL database.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#postgresql
"""
schema_name = 'public' # Specify the name of the schema to export data to
table_name = 'companies' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'mydest'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)
return df
Next create utils to send email. Create new file in utils folder.
import smtplib,ssl
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email.utils import formatdate
from email import encoders
def send_email_wt_attach(mail_subject,
mail_login,
mail_password,
mail_from,
mail_to,
mail_body,
attach_path,
smtp_host,
smtp_port):
msg = MIMEMultipart()
# header
msg["Subject"] = mail_subject
msg["From"] = mail_from
msg["To"] = ', '.join(mail_to)
# body
body = mail_body
msg.attach(MIMEText(body))
# attachment
filename = attach_path
header_attachment = 'attachment; filename="{}"'.format(filename)
part = MIMEBase('application', 'octet-stream')
part.set_payload(open(filename, 'rb').read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', header_attachment)
msg.attach(part)
context=ssl.create_default_context()
# send email
with smtplib.SMTP(smtp_host, port=smtp_port) as smtp:
smtp.starttls()
smtp.login(mail_login, mail_password)
smtp.send_message(msg)
Next, Create Data exporter> python > Local file
from mage_ai.io.file import FileIO
from pandas import DataFrame
import pandas as pd
from magetest.utils.send_email import send_email_wt_attach
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_file(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to filesystem.
Docs: https://docs.mage.ai/design/data-loading#fileio
"""
filename = 'export_excel.xlsx'
sheetname = 'sheet1'
df = df.sort_index()
# Create a Pandas Excel writer using XlsxWriter as the engine.
writer = pd.ExcelWriter(filename, engine='xlsxwriter')
# Convert the dataframe to an XlsxWriter Excel object.
df.to_excel(writer, sheet_name=sheetname, index=False)
workbook = writer.book
worksheet = writer.sheets[sheetname]
number_format = workbook.add_format({'num_format': '#', 'border': 1})
text_format = workbook.add_format({'num_format': '@', 'border': 1})
worksheet.set_column('A:A', 5)
worksheet.set_column('B:B', 20)
worksheet.set_column('C:C', 20)
worksheet.set_column('D:D', 20)
worksheet.set_column('E:E', 10)
#worksheet.set_column('D:D', 20, text_format)
# Format
header_format = workbook.add_format({
'bold': True,
'text_wrap': True,
'valign': 'top',
'fg_color': '#D7E4BC',
'border': 1})
# Write the column headers with the defined format.
for col_num, value in enumerate(df.columns.values):
worksheet.write(0, col_num, value, header_format)
for row in range(1, len(df) + 1):
for col in range(len(df.columns)):
value = df.iloc[row - 1, col]
if pd.notna(value):
if df.columns[col] == 'id':
worksheet.write(row, col, value, number_format)
else:
worksheet.write(row, col, value, text_format)
# Close the Pandas Excel writer and output the Excel file.
writer.close()
send_email_wt_attach('Test email MageAI',
'YOUR_FROM_EMAIL@gmail.com',
'YOUR_EMAIL_PASSWORD',
'YOUR_FROM_EMAIL@gmail.com',
['YOUR_TO_EMAIL@gmail.com'],
'Test Body for MageAI',
filename,
'smtp.gmail.com',
'587'
)
Your pipeline will look like this.
Run your pipeline. Check your data in your Database and Check your email.