Skip to content

Commit

Permalink
Merge pull request #71 from wey-gu/support_parquet
Browse files Browse the repository at this point in the history
feat: support parquet file now.
  • Loading branch information
wey-gu authored Aug 7, 2024
2 parents e26d07a + febef99 commit 2c9f116
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ For a more comprehensive guide on how to get started with `jupyter_nebulagraph`,
| Feature | Cheat Sheet | Example | Command Documentation |
| ------- | ----------- | --------- | ---------------------- |
| Connect | `%ngql --address 127.0.0.1 --port 9669 --user user --password password` | [Connect](https://jupyter-nebulagraph.readthedocs.io/en/stable/get_started_docs/#connect-to-nebulagraph) | [`%ngql`](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ngql/#connect-to-nebulagraph) |
| Load Data from CSV | `%ng_load --source actor.csv --tag player --vid 0 --props 1:name,2:age --space basketballplayer` | [Load Data](https://jupyter-nebulagraph.readthedocs.io/en/stable/get_started_docs/#load-data-from-csv) | [`%ng_load`](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ng_load/) |
| Load Data from CSV or Parquet | `%ng_load --source actor.csv --tag player --vid 0 --props 1:name,2:age --space basketballplayer` | [Load Data](https://jupyter-nebulagraph.readthedocs.io/en/stable/get_started_docs/#load-data-from-csv) | [`%ng_load`](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ng_load/) |
| Query Execution | `%ngql MATCH p=(v:player{name:"Tim Duncan"})-->(v2:player) RETURN p;`| [Query Execution](https://jupyter-nebulagraph.readthedocs.io/en/stable/get_started_docs/#query) | [`%ngql` or `%%ngql`(multi-line)](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ngql/#make-queries) |
| Result Visualization | `%ng_draw` | [Draw Graph](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ng_draw/) | [`%ng_draw`](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ng_draw/) |
| Draw Schema | `%ng_draw_schema` | [Draw Schema](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ng_draw_schema/) | [`%ng_draw_schema`](https://jupyter-nebulagraph.readthedocs.io/en/stable/magic_words/ng_draw_schema/) |
Expand Down
6 changes: 3 additions & 3 deletions docs/magic_words/ng_load.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## Load Data from CSV
## Load Data from CSV or Parquet file

It's supported to load data from a CSV file into NebulaGraph with the help of `ng_load_csv` magic.
It's supported to load data from a CSV or Parquet file into NebulaGraph with the help of `ng_load` magic.

### Examples

Expand Down Expand Up @@ -58,7 +58,7 @@ Some other examples:

### Usage

```
```python
%ng_load --source <source> [--header] --space <space> [--tag <tag>] [--vid <vid>] [--edge <edge>] [--src <src>] [--dst <dst>] [--rank <rank>] [--props <props>] [-b <batch>] [--limit <limit>]
```

Expand Down
8 changes: 6 additions & 2 deletions ngql/magic.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ def _execute(self, query):
if self.space is not None: # Always use space automatically
session.execute(f"USE { self.space }")
result = session.execute(query)
assert result.is_succeeded(), f"Query Failed:\n { result.error_msg() }"
assert (
result.is_succeeded()
), f"Query Failed:\n { result.error_msg() }\n Query:\n { query }"
self._remember_space(result)
except Exception as e:
fancy_print(f"[ERROR]:\n { e }", color="red")
Expand Down Expand Up @@ -514,7 +516,9 @@ def ng_draw(self, line, cell=None, local_ns={}):

# Update node sizes based on PageRank scores
for node_id, score in pagerank_scores.items():
normalized_size = 10 + score * 90 # Reduced multiplier for smaller size normalization
normalized_size = (
10 + score * 90
) # Reduced multiplier for smaller size normalization
g.get_node(node_id)["size"] = min(normalized_size, 80)
except Exception as e:
fancy_print(
Expand Down
63 changes: 48 additions & 15 deletions ngql/ng_load.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import requests
import pandas as pd
from io import StringIO
from io import BytesIO, StringIO
from typing import Callable

from nebula3.data.ResultSet import ResultSet
Expand Down Expand Up @@ -69,25 +69,51 @@ def ng_load(execute_fn: Callable[[str], ResultSet], args: LoadDataArgsModel):

limit = args.limit

# Function to safely load CSV with limit
def safe_load_csv(source, header_option, limit=None):
temp_df = pd.read_csv(source, header=header_option)
# Function to safely load CSV or Parquet with limit
def safe_load_file(source, file_type, header_option=None, limit=None):
if file_type == "csv":
temp_df = pd.read_csv(source, header=header_option)
elif file_type == "parquet":
temp_df = pd.read_parquet(source)
else:
raise ValueError(f"Unsupported file type: {file_type}")

if isinstance(limit, int) and limit > 0:
return temp_df.head(limit)
return temp_df

# Load CSV from file or URL
# Determine file type based on source extension
file_type = (
"csv"
if args.source.lower().endswith(".csv")
else "parquet"
if args.source.lower().endswith(".parquet")
else None
)
if file_type is None:
raise ValueError(
"Unsupported file type. Please use either CSV or Parquet files."
)

# Load file from URL or local path
if args.source.startswith("http://") or args.source.startswith("https://"):
response = requests.get(args.source)
csv_string = response.content.decode("utf-8")
df = safe_load_csv(
StringIO(csv_string),
if file_type == "csv":
file_content = StringIO(response.content.decode("utf-8"))
else: # parquet
file_content = BytesIO(response.content)
df = safe_load_file(
file_content,
file_type,
header_option=0 if with_header else None,
limit=limit,
)
else:
df = safe_load_csv(
args.source, header_option=0 if with_header else None, limit=limit
df = safe_load_file(
args.source,
file_type,
header_option=0 if with_header else None,
limit=limit,
)

# Build schema type map for tag or edge type
Expand Down Expand Up @@ -217,7 +243,9 @@ def safe_load_csv(source, header_option, limit=None):
else:
query = f"INSERT VERTEX `{args.tag}` (`{'`, `'.join(prop_columns)}`) VALUES "
for index, row in batch.iterrows():
vid_str = f'{QUOTE_VID}{row["___vid"]}{QUOTE_VID}'
raw_vid_str = row["___vid"].strip('"').replace('"', '\\"')
vid_str = f"{QUOTE_VID}{raw_vid_str}{QUOTE_VID}"

prop_str = ""
if with_props:
for prop_name in prop_columns:
Expand All @@ -230,7 +258,8 @@ def safe_load_csv(source, header_option, limit=None):
)
prop_str += "NULL, "
elif prop_schema_map[prop_name]["type"] == "string":
prop_str += f"{QUOTE}{prop_value}{QUOTE}, "
raw_prop_str = prop_value.strip('"').replace('"', '\\"')
prop_str += f"{QUOTE}{raw_prop_str}{QUOTE}, "
elif prop_schema_map[prop_name]["type"] == "date":
prop_str += f"date({QUOTE}{prop_value}{QUOTE}), "
elif prop_schema_map[prop_name]["type"] == "datetime":
Expand Down Expand Up @@ -275,8 +304,10 @@ def safe_load_csv(source, header_option, limit=None):
f"INSERT EDGE `{args.edge}` (`{'`, `'.join(prop_columns)}`) VALUES "
)
for index, row in batch.iterrows():
src_str = f'{QUOTE_VID}{row["___src"]}{QUOTE_VID}'
dst_str = f'{QUOTE_VID}{row["___dst"]}{QUOTE_VID}'
raw_src_str = row["___src"].strip('"').replace('"', '\\"')
src_str = f"{QUOTE_VID}{raw_src_str}{QUOTE_VID}"
raw_dst_str = row["___dst"].strip('"').replace('"', '\\"')
dst_str = f"{QUOTE_VID}{raw_dst_str}{QUOTE_VID}"
prop_str = ""
if with_props:
for prop_name in prop_columns:
Expand All @@ -289,7 +320,8 @@ def safe_load_csv(source, header_option, limit=None):
)
prop_str += "NULL, "
elif prop_schema_map[prop_name]["type"] == "string":
prop_str += f"{QUOTE}{prop_value}{QUOTE}, "
raw_prop_str = prop_value.strip('"').replace('"', '\\"')
prop_str += f"{QUOTE}{raw_prop_str}{QUOTE}, "
elif prop_schema_map[prop_name]["type"] == "date":
prop_str += f"date({QUOTE}{prop_value}{QUOTE}), "
elif prop_schema_map[prop_name]["type"] == "datetime":
Expand Down Expand Up @@ -344,6 +376,7 @@ def args_load(line: str):
%ng_load --source https://github.com/wey-gu/awesome-graph-dataset/raw/main/datasets/shareholding/tiny/corp_share.csv --edge hold_share --src 0 --dst 1 --props 2:share --space shareholding
%ng_load --source https://github.com/wey-gu/awesome-graph-dataset/raw/main/datasets/shareholding/tiny/person_corp_share.csv --edge hold_share --src 0 --dst 1 --props 2:share --space shareholding
%ng_load --source https://github.com/wey-gu/awesome-graph-dataset/raw/main/datasets/shareholding/tiny/person_rel.csv --edge reletive_with --src 0 --dst 1 --props 2:degree --space shareholding
%ng_load --header --source https://github.com/microsoft/graphrag/raw/main/examples_notebooks/inputs/operation%20dulce/create_final_entities.parquet --tag entity --vid 1 --props 1:name --space ms_paper
"""
execute_fn = conn_pool.get_session("root", "nebula").execute
for line in test.split("\n"):
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="jupyter_nebulagraph",
version="0.13.4",
version="0.14.0",
author="Wey Gu",
author_email="[email protected]",
description="Jupyter extension for NebulaGraph",
Expand Down Expand Up @@ -33,5 +33,6 @@
"pydantic",
"scipy",
"ipywidgets",
"pyarrow",
],
)
3 changes: 2 additions & 1 deletion setup_ipython.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="ipython-ngql",
version="0.13.4",
version="0.14.0",
author="Wey Gu",
author_email="[email protected]",
description="Jupyter extension for NebulaGraph",
Expand Down Expand Up @@ -33,5 +33,6 @@
"pydantic",
"scipy",
"ipywidgets",
"pyarrow",
],
)

0 comments on commit 2c9f116

Please sign in to comment.