diff --git a/my_code-challenge/Scripts/extract_and_transfer.py b/my_code-challenge/Scripts/extract_and_transfer.py new file mode 100644 index 0000000000..1bf805ae1c --- /dev/null +++ b/my_code-challenge/Scripts/extract_and_transfer.py @@ -0,0 +1,116 @@ +import os +import csv +import psycopg2 +from datetime import datetime +import subprocess +import requests + +# Função para criar diretórios se não existirem +def create_directory(path): + os.makedirs(path, exist_ok=True) + +# Função para salvar os dados em arquivos CSV +def save_to_local_disk(data, path, file_name): + create_directory(path) + file_path = os.path.join(path, file_name) + with open(file_path, 'w', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + writer.writerows(data) + print(f"Data saved to {file_path}") + +# Função para executar comandos SQL no novo banco de dados +def execute_sql_commands(conn, sql_commands): + try: + cursor = conn.cursor() + cursor.execute(sql_commands) + conn.commit() + print("SQL commands executed successfully.") + except Exception as e: + print(f"Error executing SQL: {e}") + finally: + cursor.close() + +# Função para extrair tabelas de 'localdb' e copiar para o novo banco +def transfer_tables_to_new_db(): + postgres_config_localdb = { + "host": "localhost", + "port": 5432, + "database": "localdb", + "user": "Admin", + "password": "123456" + } + + # Conexão com o banco de dados 'localdb' + conn_localdb = psycopg2.connect(**postgres_config_localdb) + conn_localdb.autocommit = True # Para permitir operações DDL + cursor_localdb = conn_localdb.cursor() + + # Obter os nomes das tabelas em 'localdb' + cursor_localdb.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'") + tables = cursor_localdb.fetchall() + print(f"Tables in localdb: {[table[0] for table in tables]}") + + conn_localdb.close() + return tables + +# Função para extrair dados das tabelas e salvar no novo banco de dados +def extract_and_save_tables(): + # Configurações de conexão para o novo banco de dados (via Docker) + postgres_config_new_db = { + "host": "localhost", + "port": 5432, + "database": "northwind", # Nome do banco de dados conforme definido no Docker + "user": "northwind_user", # Nome do usuário conforme definido no Docker + "password": "thewindisblowing" # Senha conforme definida no Docker + } + + # Estabelecendo conexão com o novo banco de dados + conn_new_db = psycopg2.connect(**postgres_config_new_db) + cursor_new_db = conn_new_db.cursor() + + # Obter a data atual + execution_date = datetime.now().strftime("%Y-%m-%d") + + # Obter as tabelas de 'localdb' para transferir para o novo banco + tables = transfer_tables_to_new_db() + + # Copiar os dados das tabelas para o novo banco e salvar os CSVs + for table in tables: + table_name = table[0] + cursor_new_db.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS TABLE localdb.{table_name}") + print(f"Table {table_name} copied to new database.") + + # Agora, extraímos os dados e salvamos como CSV + cursor_new_db.execute(f"SELECT * FROM {table_name}") + rows = cursor_new_db.fetchall() + headers = [desc[0] for desc in cursor_new_db.description] + data = [headers] + rows + + # Salvar os dados como CSV em /data/postgres/{table}/{date}/file.format + save_to_local_disk(data, f"data/postgres/{table_name}/{execution_date}", f"{table_name}.csv") + + # Salvar também em /data/csv/{date}/file.format + save_to_local_disk(data, f"data/csv/{execution_date}", f"{table_name}.csv") + + # Finalizando a conexão + cursor_new_db.close() + conn_new_db.close() + +# Função para inicializar o Docker e configurar o PostgreSQL +def setup_docker(): + try: + print("Starting Docker container from docker-compose.yml...") + subprocess.run(["docker-compose", "-f", "docker-compose.yml", "up", "-d"], check=True) + print("Docker container started.") + except subprocess.CalledProcessError as e: + print(f"Error starting Docker container: {e}") + return + +# Função principal para executar o processo +def main(): + print("Starting the process...") + setup_docker() + extract_and_save_tables() + +if __name__ == "__main__": + main() diff --git a/my_code-challenge/Scripts/extract_csv.py b/my_code-challenge/Scripts/extract_csv.py new file mode 100644 index 0000000000..5120bd44a0 --- /dev/null +++ b/my_code-challenge/Scripts/extract_csv.py @@ -0,0 +1,77 @@ +import os +import csv +import psycopg2 +from datetime import datetime +import requests + +def create_directory(path): + os.makedirs(path, exist_ok=True) + +def save_to_local_disk(data, path, file_name): + create_directory(path) + file_path = os.path.join(path, file_name) + with open(file_path, 'w', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + writer.writerows(data) + print(f"Data saved to {file_path}") + +def write_to_postgres(data, table_name): + postgres_config = { + "host": "localhost", + "port": 5432, + "database": "localdb", + "user": "Admin", + "password": "123456" + } + + conn = None # Inicialize a variável conn fora do bloco try + try: + conn = psycopg2.connect(**postgres_config) + cursor = conn.cursor() + + # Create table if it doesn't exist + headers = data[0] + cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join([f'{header} TEXT' for header in headers])})") + + # Insert data + for row in data[1:]: + placeholders = ', '.join(['%s'] * len(row)) + cursor.execute(f"INSERT INTO {table_name} VALUES ({placeholders})", row) + + conn.commit() + print(f"Data written to PostgreSQL table {table_name}") + + except Exception as e: + print(f"Error writing to PostgreSQL: {e}") + + finally: + if conn: # Verifique se conn foi definida antes de tentar fechar + cursor.close() + conn.close() + +def extract_csv(): + # URL correto para baixar o arquivo CSV do GitHub + csv_url = "https://raw.githubusercontent.com/techindicium/code-challenge/main/data/order_details.csv" + execution_date = datetime.now().strftime("%Y-%m-%d") + local_path = f"data/csv/{execution_date}" + file_name = "order_details.csv" # Alterando para o nome real do arquivo + + # Download and process the CSV file + response = requests.get(csv_url) + response.raise_for_status() + + rows = response.text.splitlines() + csv_data = list(csv.reader(rows)) + + save_to_local_disk(csv_data, local_path, file_name) + + # Usar o nome real do arquivo (sem a extensão) como o nome da tabela + table_name = file_name.replace('.csv', '') # Remove a extensão .csv + write_to_postgres(csv_data, table_name) + +def main(): + print("Starting CSV extraction...") + extract_csv() + +if __name__ == "__main__": + main() diff --git a/my_code-challenge/Scripts/extract_postgres.py b/my_code-challenge/Scripts/extract_postgres.py new file mode 100644 index 0000000000..2590ddb45e --- /dev/null +++ b/my_code-challenge/Scripts/extract_postgres.py @@ -0,0 +1,141 @@ +import os +import csv +import psycopg2 +from datetime import datetime +import requests + +def create_directory(path): + os.makedirs(path, exist_ok=True) + +def save_to_local_disk(data, path, file_name): + create_directory(path) + file_path = os.path.join(path, file_name) + with open(file_path, 'w', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + writer.writerows(data) + print(f"Data saved to {file_path}") + +def clean_sql_commands(sql_commands): + """Remove linhas problemáticas e metadados inválidos.""" + lines = sql_commands.split("\n") + cleaned_lines = [] + for line in lines: + # Ignorar metadados e linhas inválidas + if line.strip().startswith(("Type:", "Schema:", "Owner:", "--")) or not line.strip(): + continue + cleaned_lines.append(line) + return "\n".join(cleaned_lines) + +def execute_sql_file(conn, sql_file_path): + try: + cursor = conn.cursor() + with open(sql_file_path, 'r', encoding='utf-8') as file: + sql_commands = file.read() + + # Limpar comandos SQL + sql_commands = clean_sql_commands(sql_commands) + + # Separar comandos SQL + commands = [cmd.strip() for cmd in sql_commands.split(";") if cmd.strip()] + + if not commands: + print("Error: No valid SQL commands found in the file.") + return + + for command in commands: + try: + cursor.execute(command) + except Exception as e: + print(f"Error executing command: {command[:50]}... - {e}") + + conn.commit() + print(f"SQL file {sql_file_path} executed successfully.") + + except Exception as e: + print(f"Error executing SQL file: {e}") + + finally: + if cursor: + cursor.close() + +def extract_postgres(): + postgres_config = { + "host": "localhost", + "port": 5432, + "database": "localdb", # Conectando ao banco 'localdb' + "user": "Admin", + "password": "123456" + } + + execution_date = datetime.now().strftime("%Y-%m-%d") + sql_url = "https://raw.githubusercontent.com/techindicium/code-challenge/main/data/northwind.sql" + sql_file_path = "northwind.sql" + + # Baixar o arquivo SQL do GitHub + try: + response = requests.get(sql_url) + response.raise_for_status() + with open(sql_file_path, 'w', encoding='utf-8') as file: + file.write(response.text) + print(f"SQL file downloaded and saved to {sql_file_path}.") + except Exception as e: + print(f"Error downloading SQL file: {e}") + return + + conn = None + try: + conn = psycopg2.connect(**postgres_config) + conn.autocommit = True # Permite execução de comandos DDL sem necessidade de commit + + # Criar e popular o banco + execute_sql_file(conn, sql_file_path) + + # Verificar tabelas criadas + cursor = conn.cursor() + cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'") + tables = cursor.fetchall() + + if not tables: + print("No tables found in the database. Check the SQL execution.") + return + + # Confirmar tabelas criadas no banco de dados + print(f"The following tables have been created in the database '{postgres_config['database']}':") + for table in tables: + print(f"- {table[0]}") + + # Extrair dados das tabelas e salvar em arquivos CSV + for table in tables: + table_name = table[0] + table_query = f"SELECT * FROM {table_name}" + cursor.execute(table_query) + rows = cursor.fetchall() + headers = [desc[0] for desc in cursor.description] + + data = [headers] + rows + save_to_local_disk(data, f"data/postgres/{table_name}/{execution_date}", f"{table_name}.csv") + print(f"Data from table {table_name} saved to disk.") + + # Excluir o banco de dados após extração + print(f"Dropping the 'northwind' database to clean up...") + cursor.execute("DROP DATABASE IF EXISTS northwind") + + # Remover o arquivo SQL após a execução + if os.path.exists(sql_file_path): + os.remove(sql_file_path) + print(f"File {sql_file_path} removed after execution.") + + except Exception as e: + print(f"Error extracting PostgreSQL data: {e}") + + finally: + if conn: + cursor.close() + conn.close() + +def main(): + print("Starting PostgreSQL extraction...") + extract_postgres() + +if __name__ == "__main__": + main() diff --git a/my_code-challenge/setup.sh b/my_code-challenge/setup.sh new file mode 100755 index 0000000000..fdca5bb13f --- /dev/null +++ b/my_code-challenge/setup.sh @@ -0,0 +1,153 @@ +#!/bin/bash + +set -e # Terminar o script imediatamente se um comando falhar +set -o pipefail # Propagar falhas em pipes + +# Função para exibir mensagens +log() { + echo "****** $1" +} + +# Atualizar e atualizar listas de pacotes +log "Atualizando lista de pacotes..." +sudo apt update && sudo apt upgrade -y +log "Lista de pacotes atualizada com sucesso!" + +# Instalar dependências básicas (incluindo curl) +log "Instalando dependências básicas (curl, etc.)..." +sudo apt install -y curl software-properties-common +log "Dependências básicas instaladas com sucesso!" + +# Instalar PostgreSQL +log "Instalando PostgreSQL..." +sudo apt install -y postgresql postgresql-contrib +log "PostgreSQL instalado com sucesso!" + +log "Iniciando e habilitando o serviço PostgreSQL..." +sudo systemctl start postgresql +sudo systemctl enable postgresql + +log "Verificando status do serviço PostgreSQL..." +if sudo systemctl is-active --quiet postgresql; then + log "Serviço PostgreSQL ativo!" +else + log "Erro: Serviço PostgreSQL inativo." + exit 1 +fi + +# Verificar se o banco de dados LocalDB já existe +log "Verificando se o banco de dados LocalDB já existe..." +DB_EXISTS=$(sudo -u postgres psql -tAc "SELECT 1 FROM pg_database WHERE datname='localdb';") + +if [ "$DB_EXISTS" == "1" ]; then + log "Banco de dados LocalDB já existe. Continuando..." +else + log "Criando banco de dados LocalDB..." + sudo -u postgres psql -c "CREATE DATABASE localdb;" + log "Banco de dados LocalDB criado com sucesso!" +fi + +# Instalar Python 3.9 +log "Instalando Python 3.9..." +sudo add-apt-repository -y ppa:deadsnakes/ppa +sudo apt update +sudo apt install -y python3.9 python3.9-distutils +log "Python 3.9 instalado com sucesso!" + +log "Verificando versão do Python..." +python3.9 --version || (log "Erro: Python 3.9 não foi instalado corretamente."; exit 1) + +log "Instalando pip para Python 3.9..." +curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py +python3.9 get-pip.py +rm get-pip.py +log "pip instalado com sucesso!" + +log "Verificando versão do pip..." +pip3 --version || (log "Erro: pip não foi instalado corretamente."; exit 1) + +# Instalar Docker +log "Atualizando lista de pacotes para Docker..." +sudo apt update && sudo apt upgrade -y +log "Lista de pacotes atualizada!" + +log "Instalando pré-requisitos do Docker..." +sudo apt install -y apt-transport-https ca-certificates curl software-properties-common + +log "Adicionando chave GPG do Docker..." +curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - + +log "Adicionando repositório do Docker..." +sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" + +log "Atualizando lista de pacotes novamente..." +sudo apt update + +log "Instalando Docker..." +sudo apt install -y docker-ce +log "Docker instalado com sucesso!" + +log "Verificando versão do Docker..." +sudo docker --version || (log "Erro: Docker não foi instalado corretamente."; exit 1) + +log "Adicionando usuário ao grupo Docker..." +sudo usermod -aG docker $USER +log "Usuário adicionado ao grupo Docker com sucesso! Reinicie a sessão para aplicar alterações." + +# Instalar Airflow seguindo as instruções oficiais +log "Instalando dependências do Airflow..." +sudo apt install -y libpq-dev python3.9-dev + +log "Criando ambiente virtual para Airflow..." +python3.9 -m venv airflow_env +source airflow_env/bin/activate + +log "Atualizando pip no ambiente virtual..." +pip install --upgrade pip + +log "Instalando Apache Airflow com PostgreSQL support..." +AIRFLOW_VERSION=2.6.3 +PYTHON_VERSION="$(python3.9 --version | cut -d " " -f 2 | cut -d "." -f 1-2)" +CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" +pip install "apache-airflow[postgres]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" + +# Corrigir conflitos de dependências com versões específicas +log "Instalando versões compatíveis de SQLAlchemy e Packaging..." +pip cache purge # Limpa o cache do pip +pip install --force-reinstall "sqlalchemy==1.4.43" "packaging==21.3" + +log "Configurando o banco de dados do Airflow..." +export AIRFLOW_HOME=$(pwd)/airflow_home +airflow db init + +log "Criando usuário administrador do Airflow..." +airflow users create \ + --username admin \ + --firstname Admin \ + --lastname Admin \ + --role Admin \ + --email admin@example.com \ + --password admin + +log "Iniciando servidor web do Airflow..." +airflow webserver -D + +log "Iniciando agendador do Airflow..." +airflow scheduler -D + +log "Airflow configurado! Acesse http://localhost:8080 para usar." + +# Instalar Meltano +log "Instalando Meltano..." +pip install meltano + +log "Inicializando projeto Meltano..." +meltano init Indicium_Project +if [ $? -eq 0 ]; then + log "Projeto Meltano inicializado com sucesso!" +else + log "Erro: Falha ao inicializar o projeto Meltano." + exit 1 +fi + +log "Instalação concluída! Projeto Meltano inicializado com sucesso."