Skip to content

Adiciona a pasta my_code-challenge ao repositório #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions my_code-challenge/Scripts/extract_and_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import os
import csv
import psycopg2
from datetime import datetime
import subprocess
import requests

# Função para criar diretórios se não existirem
def create_directory(path):
os.makedirs(path, exist_ok=True)

# Função para salvar os dados em arquivos CSV
def save_to_local_disk(data, path, file_name):
create_directory(path)
file_path = os.path.join(path, file_name)
with open(file_path, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerows(data)
print(f"Data saved to {file_path}")

# Função para executar comandos SQL no novo banco de dados
def execute_sql_commands(conn, sql_commands):
try:
cursor = conn.cursor()
cursor.execute(sql_commands)
conn.commit()
print("SQL commands executed successfully.")
except Exception as e:
print(f"Error executing SQL: {e}")
finally:
cursor.close()

# Função para extrair tabelas de 'localdb' e copiar para o novo banco
def transfer_tables_to_new_db():
postgres_config_localdb = {
"host": "localhost",
"port": 5432,
"database": "localdb",
"user": "Admin",
"password": "123456"
}

# Conexão com o banco de dados 'localdb'
conn_localdb = psycopg2.connect(**postgres_config_localdb)
conn_localdb.autocommit = True # Para permitir operações DDL
cursor_localdb = conn_localdb.cursor()

# Obter os nomes das tabelas em 'localdb'
cursor_localdb.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")
tables = cursor_localdb.fetchall()
print(f"Tables in localdb: {[table[0] for table in tables]}")

conn_localdb.close()
return tables

# Função para extrair dados das tabelas e salvar no novo banco de dados
def extract_and_save_tables():
# Configurações de conexão para o novo banco de dados (via Docker)
postgres_config_new_db = {
"host": "localhost",
"port": 5432,
"database": "northwind", # Nome do banco de dados conforme definido no Docker
"user": "northwind_user", # Nome do usuário conforme definido no Docker
"password": "thewindisblowing" # Senha conforme definida no Docker
}

# Estabelecendo conexão com o novo banco de dados
conn_new_db = psycopg2.connect(**postgres_config_new_db)
cursor_new_db = conn_new_db.cursor()

# Obter a data atual
execution_date = datetime.now().strftime("%Y-%m-%d")

# Obter as tabelas de 'localdb' para transferir para o novo banco
tables = transfer_tables_to_new_db()

# Copiar os dados das tabelas para o novo banco e salvar os CSVs
for table in tables:
table_name = table[0]
cursor_new_db.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS TABLE localdb.{table_name}")
print(f"Table {table_name} copied to new database.")

# Agora, extraímos os dados e salvamos como CSV
cursor_new_db.execute(f"SELECT * FROM {table_name}")
rows = cursor_new_db.fetchall()
headers = [desc[0] for desc in cursor_new_db.description]
data = [headers] + rows

# Salvar os dados como CSV em /data/postgres/{table}/{date}/file.format
save_to_local_disk(data, f"data/postgres/{table_name}/{execution_date}", f"{table_name}.csv")

# Salvar também em /data/csv/{date}/file.format
save_to_local_disk(data, f"data/csv/{execution_date}", f"{table_name}.csv")

# Finalizando a conexão
cursor_new_db.close()
conn_new_db.close()

# Função para inicializar o Docker e configurar o PostgreSQL
def setup_docker():
try:
print("Starting Docker container from docker-compose.yml...")
subprocess.run(["docker-compose", "-f", "docker-compose.yml", "up", "-d"], check=True)
print("Docker container started.")
except subprocess.CalledProcessError as e:
print(f"Error starting Docker container: {e}")
return

# Função principal para executar o processo
def main():
print("Starting the process...")
setup_docker()
extract_and_save_tables()

if __name__ == "__main__":
main()
77 changes: 77 additions & 0 deletions my_code-challenge/Scripts/extract_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import os
import csv
import psycopg2
from datetime import datetime
import requests

def create_directory(path):
os.makedirs(path, exist_ok=True)

def save_to_local_disk(data, path, file_name):
create_directory(path)
file_path = os.path.join(path, file_name)
with open(file_path, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerows(data)
print(f"Data saved to {file_path}")

def write_to_postgres(data, table_name):
postgres_config = {
"host": "localhost",
"port": 5432,
"database": "localdb",
"user": "Admin",
"password": "123456"
}

conn = None # Inicialize a variável conn fora do bloco try
try:
conn = psycopg2.connect(**postgres_config)
cursor = conn.cursor()

# Create table if it doesn't exist
headers = data[0]
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join([f'{header} TEXT' for header in headers])})")

# Insert data
for row in data[1:]:
placeholders = ', '.join(['%s'] * len(row))
cursor.execute(f"INSERT INTO {table_name} VALUES ({placeholders})", row)

conn.commit()
print(f"Data written to PostgreSQL table {table_name}")

except Exception as e:
print(f"Error writing to PostgreSQL: {e}")

finally:
if conn: # Verifique se conn foi definida antes de tentar fechar
cursor.close()
conn.close()

def extract_csv():
# URL correto para baixar o arquivo CSV do GitHub
csv_url = "https://raw.githubusercontent.com/techindicium/code-challenge/main/data/order_details.csv"
execution_date = datetime.now().strftime("%Y-%m-%d")
local_path = f"data/csv/{execution_date}"
file_name = "order_details.csv" # Alterando para o nome real do arquivo

# Download and process the CSV file
response = requests.get(csv_url)
response.raise_for_status()

rows = response.text.splitlines()
csv_data = list(csv.reader(rows))

save_to_local_disk(csv_data, local_path, file_name)

# Usar o nome real do arquivo (sem a extensão) como o nome da tabela
table_name = file_name.replace('.csv', '') # Remove a extensão .csv
write_to_postgres(csv_data, table_name)

def main():
print("Starting CSV extraction...")
extract_csv()

if __name__ == "__main__":
main()
141 changes: 141 additions & 0 deletions my_code-challenge/Scripts/extract_postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import os
import csv
import psycopg2
from datetime import datetime
import requests

def create_directory(path):
os.makedirs(path, exist_ok=True)

def save_to_local_disk(data, path, file_name):
create_directory(path)
file_path = os.path.join(path, file_name)
with open(file_path, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerows(data)
print(f"Data saved to {file_path}")

def clean_sql_commands(sql_commands):
"""Remove linhas problemáticas e metadados inválidos."""
lines = sql_commands.split("\n")
cleaned_lines = []
for line in lines:
# Ignorar metadados e linhas inválidas
if line.strip().startswith(("Type:", "Schema:", "Owner:", "--")) or not line.strip():
continue
cleaned_lines.append(line)
return "\n".join(cleaned_lines)

def execute_sql_file(conn, sql_file_path):
try:
cursor = conn.cursor()
with open(sql_file_path, 'r', encoding='utf-8') as file:
sql_commands = file.read()

# Limpar comandos SQL
sql_commands = clean_sql_commands(sql_commands)

# Separar comandos SQL
commands = [cmd.strip() for cmd in sql_commands.split(";") if cmd.strip()]

if not commands:
print("Error: No valid SQL commands found in the file.")
return

for command in commands:
try:
cursor.execute(command)
except Exception as e:
print(f"Error executing command: {command[:50]}... - {e}")

conn.commit()
print(f"SQL file {sql_file_path} executed successfully.")

except Exception as e:
print(f"Error executing SQL file: {e}")

finally:
if cursor:
cursor.close()

def extract_postgres():
postgres_config = {
"host": "localhost",
"port": 5432,
"database": "localdb", # Conectando ao banco 'localdb'
"user": "Admin",
"password": "123456"
}

execution_date = datetime.now().strftime("%Y-%m-%d")
sql_url = "https://raw.githubusercontent.com/techindicium/code-challenge/main/data/northwind.sql"
sql_file_path = "northwind.sql"

# Baixar o arquivo SQL do GitHub
try:
response = requests.get(sql_url)
response.raise_for_status()
with open(sql_file_path, 'w', encoding='utf-8') as file:
file.write(response.text)
print(f"SQL file downloaded and saved to {sql_file_path}.")
except Exception as e:
print(f"Error downloading SQL file: {e}")
return

conn = None
try:
conn = psycopg2.connect(**postgres_config)
conn.autocommit = True # Permite execução de comandos DDL sem necessidade de commit

# Criar e popular o banco
execute_sql_file(conn, sql_file_path)

# Verificar tabelas criadas
cursor = conn.cursor()
cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")
tables = cursor.fetchall()

if not tables:
print("No tables found in the database. Check the SQL execution.")
return

# Confirmar tabelas criadas no banco de dados
print(f"The following tables have been created in the database '{postgres_config['database']}':")
for table in tables:
print(f"- {table[0]}")

# Extrair dados das tabelas e salvar em arquivos CSV
for table in tables:
table_name = table[0]
table_query = f"SELECT * FROM {table_name}"
cursor.execute(table_query)
rows = cursor.fetchall()
headers = [desc[0] for desc in cursor.description]

data = [headers] + rows
save_to_local_disk(data, f"data/postgres/{table_name}/{execution_date}", f"{table_name}.csv")
print(f"Data from table {table_name} saved to disk.")

# Excluir o banco de dados após extração
print(f"Dropping the 'northwind' database to clean up...")
cursor.execute("DROP DATABASE IF EXISTS northwind")

# Remover o arquivo SQL após a execução
if os.path.exists(sql_file_path):
os.remove(sql_file_path)
print(f"File {sql_file_path} removed after execution.")

except Exception as e:
print(f"Error extracting PostgreSQL data: {e}")

finally:
if conn:
cursor.close()
conn.close()

def main():
print("Starting PostgreSQL extraction...")
extract_postgres()

if __name__ == "__main__":
main()
Loading