Mix.install([
{:kino, "~> 0.9.1"},
{:floki, "~> 0.35.2"},
{:crawly, "~> 0.8"},
{:export, "~> 0.1.0"},
{:timex, "~> 3.7.11"},
{:progress_bar, "~> 3.0.0"},
{:rustler, "~> 0.27.0"},
{:uuid, "~> 1.1"},
{:youtube_organizer, path: Path.join([__DIR__, "youtube_organizer"])}
])
require Logger
yt_api_key =
"~/.keys/yt_api_key.txt"
|> Path.expand()
|> File.read!()
|> String.trim()
"Loaded api key"
defmodule YTOrg.Channels.ScrapeTube do
@moduledoc """
scrape youtube videos for a given channel
"""
def get_channel_videos(channel_url) do
with {:ok, videos} <- get_scrapetube_channel_videos(channel_url) do
{:ok, videos |> Enum.map(fn vid -> vid |> parse_scraped_video(channel_url) end)}
else
e -> e
end
end
def get_scrapetube_channel_videos(channel_url, opts \\ [scrape_date: true]) do
scrape_date = opts |> Keyword.get(:scrape_date)
script_path = __DIR__ |> Path.join("python_scripts")
date_arg =
if scrape_date do
"get_date"
else
""
end
try do
:os.cmd(:"cd #{script_path}; poetry run python yt.py #{channel_url} #{date_arg}")
|> Poison.decode()
rescue
_ -> {:error, nil}
end
end
defp parse_scraped_video(
%{"videoId" => id} = vid,
channel_url
) do
%YTOrg.YoutubeVideo{
id: id,
url: "https://www.youtube.com/watch?v=#{id}",
title: get_title(vid),
thumbnails: get_thumbnails(vid),
description: get_description(vid),
channel_url: channel_url,
channel_title: get_channel_title(channel_url),
published_at: get_published_date(vid)
}
end
defp get_published_date(%{"dateText" => %{"simpleText" => dt}}) do
with {:ok, ndate} <- dt |> Timex.parse("%b %e, %Y", :strftime),
{:ok, date} <- ndate |> DateTime.from_naive("Etc/UTC") do
date
else
_ -> nil
end
end
defp get_channel_title(channel_url) do
channel_url |> String.split("@") |> List.last()
end
defp get_title(%{"title" => %{"runs" => [%{"text" => title} | _]}}) do
title
end
defp get_thumbnails(%{"thumbnail" => %{"thumbnails" => thumbnails}}) do
thumbnails
end
defp get_description(%{"descriptionSnippet" => %{"runs" => [%{"text" => description} | _]}}) do
description
end
defp get_description(_), do: nil
end
defmodule YTOrg.Channels do
require HTTPoison
alias YTOrg.Channels.ScrapeTube
def fetch_latest_videos(channel_name, _) when is_bitstring(channel_name) do
{_, videos} = try_fetch_latest_videos(channel_name)
videos
end
def fetch_latest_videos(channel_names, opts \\ nil) when is_list(channel_names) do
channel_results =
channel_names
|> fetch_per_channel_latest_videos(opts)
log_failed_results(channel_results)
channel_results
|> Enum.flat_map(fn {_, videos} -> videos end)
|> Enum.sort_by(& &1.published_at)
|> Enum.reverse()
end
defp log_failed_results(channel_results) do
n_failed_channels =
channel_results
|> Enum.filter(fn {_, videos} -> videos == [] end)
|> Enum.count()
if n_failed_channels > 0 do
Logger.warn("scraping failed for #{n_failed_channels} channels")
end
end
def try_fetch_latest_videos(channel_name) do
results = channel_name |> channel_name_to_channel_url() |> ScrapeTube.get_channel_videos()
case results do
{:ok, videos} ->
{channel_name, videos}
_ ->
Logger.warn("failed fetching videos for #{channel_name}")
{channel_name, []}
end
end
defp fetch_per_channel_latest_videos(channel_names, async_timeout: timeout) do
channel_names
|> Enum.map(fn channel_name -> Task.async(fn -> try_fetch_latest_videos(channel_name) end) end)
|> Task.await_many(timeout)
end
defp fetch_per_channel_latest_videos(channel_names, _) do
n_names = channel_names |> Enum.count()
for {channel_name, i} <- Enum.with_index(channel_names) do
ProgressBar.render(i, n_names)
try_fetch_latest_videos(channel_name)
end
end
defp channel_name_to_channel_url(channel_name) do
"https://www.youtube.com/@#{channel_name}"
end
end
YTOrg.Channels.try_fetch_latest_videos("samwitteveenai")
defmodule YTOrg.KinoHelpers do
def get_link_md(%YTOrg.YoutubeVideo{title: title, url: url}) do
"[#{title}](#{url})"
end
def get_link_kino(%YTOrg.YoutubeVideo{} = vid) do
vid
|> get_link_md()
|> Kino.Markdown.new()
end
def get_thumbnail_url(vid) do
get_thumbnail_url(vid, "medium")
end
def get_thumbnail_url(%YTOrg.YoutubeVideo{thumbnails: thumbnails}, size)
when is_list(thumbnails) do
thumbnails_sorted = thumbnails |> Enum.sort_by(fn tn -> tn["width"] end)
thumbnails_by_size = ["small", "medium", "large"] |> Enum.zip(thumbnails_sorted) |> Map.new()
thumbnails_by_size[size]["url"]
end
def get_thumbnail_url(%YTOrg.YoutubeVideo{thumbnails: thumbnails}, size) do
thumbnails[size]["url"]
end
def maybe_add_preceding_link_kino(layout, vid = %YTOrg.YoutubeVideo{}) do
closest_vid_kino =
vid
|> get_link_kino()
Kino.Layout.grid([closest_vid_kino, layout])
end
def maybe_add_preceding_link_kino(layout, nil) do
layout
end
end
defmodule YTOrg.YoutubeKinoTiler do
defstruct [:format_tile]
def new() do
%YTOrg.YoutubeKinoTiler{format_tile: &make_basic_tile_kino/2}
end
def new(format_tile) do
%YTOrg.YoutubeKinoTiler{format_tile: format_tile}
end
def get_video_tile(kt, video) do
channel = "[#{video.channel_title}](#{video.channel_url})" |> Kino.Markdown.new()
image = "})" |> Kino.Markdown.new()
title_with_link = video |> YTOrg.KinoHelpers.get_link_md() |> Kino.Markdown.new()
video |> kt.format_tile.([image, channel, title_with_link])
end
def make_basic_tile_kino(_video, inputs) do
Kino.Layout.grid(inputs, boxed: true)
end
def make_tile_kino_with_additional_fields(fields) do
fn video, inputs ->
field_values =
for field <- fields do
field_value = video |> Map.get(field)
convert_field_to_markdown(field_value)
end
all_inputs = inputs ++ field_values
Kino.Layout.grid(all_inputs, boxed: true)
end
end
def convert_field_to_markdown(%DateTime{} = dt) do
dt |> Calendar.strftime("%Y/%m/%d") |> Kino.Markdown.new()
end
def convert_field_to_markdown(nil), do: Kino.Markdown.new("")
def convert_field_to_markdown(v), do: v |> Kino.Markdown.new()
end
alias YTOrg.YoutubeKinoTiler
basic_tiler = YoutubeKinoTiler.new()
urls = ["https://www.youtube.com/@DataIndependent"]
[example_video | _] = YTOrg.Channels.fetch_latest_videos("samwitteveenai")
basic_tiler |> YTOrg.YoutubeKinoTiler.get_video_tile(example_video)
selected_youtubers = ["samwitteveenai", "jamesbriggs", "DataIndependent"]
tiler =
YoutubeKinoTiler.new(
YoutubeKinoTiler.make_tile_kino_with_additional_fields([:published_at, :description])
)
selected_latest_videos = YTOrg.Channels.fetch_latest_videos(selected_youtubers)
:ok
[vid | _] = selected_latest_videos
defmodule YTOrg.AggregateVideos do
def aggregate_texts(videos, group_by \\ :channel_title) do
grouped_videos = videos |> group_videos(group_by)
for {gp_name, vids} <- grouped_videos, into: %{} do
vid_texts =
vids |> Enum.map(fn %_{title: title, description: desc} -> title <> " " <> desc end)
text = vid_texts |> Enum.join(" ")
{gp_name, text |> String.downcase()}
end
end
def group_videos(videos, group_by) do
videos |> Enum.group_by(fn vid -> vid |> Map.get(group_by) end)
end
end
video_texts_by_creators = selected_latest_videos |> YTOrg.AggregateVideos.aggregate_texts()
creators_wordclouds =
for {creator, text} <- video_texts_by_creators, into: %{} do
{creator, YTOrg.WordCloud.get_wordcloud_image(text)}
end
:ok
creators_wordclouds["jamesbriggs"]
creators_wordclouds["DataIndependent"]
sorted_videos =
selected_latest_videos
|> Enum.filter(&(&1.published_at != nil))
|> Enum.sort(&(DateTime.compare(&1.published_at, &2.published_at) != :lt))
video_tiles =
for vid <- sorted_videos do
tiler |> YoutubeKinoTiler.get_video_tile(vid)
end
:ok
# video_tiles = selected_latest_videos |> Enum.map(fn vid -> basic_tiler |> YoutubeKinoTiler.get_video_tile(vid) end)
page_size = 18
n_pages =
video_tiles
|> Enum.count()
|> Kernel.div(page_size)
|> Kernel.ceil()
page_select = Kino.Input.select("Page", for(p <- 1..page_size, do: {p, p |> Integer.to_string()}))
video_tiles |> Enum.take(18) |> Kino.Layout.grid(columns: 3)
It is notoriously hard to get an overview of subbed channels.
We're going to tackle that. First let's extract subs from youtube's left panel that shows subs.
subs_path = __ENV__.file |> Path.dirname() |> Path.join("assets/yt_subs.html")
{:ok, subs_html} = subs_path |> File.read!() |> Floki.parse_document()
:ok
defmodule YTOrg.YoutubeSubsExtractor do
def extract_subs(subs_html) do
channel_elems =
subs_html
|> Floki.find(".yt-simple-endpoint")
channel_elems |> Enum.flat_map(&maybe_get_sub/1)
end
def maybe_get_sub({"a", elems, _}) do
with {:ok, sub_data} <- get_title_and_link(elems) do
[sub_data]
else
_ -> []
end
end
def get_title_and_link([]), do: {:error, nil}
def get_title_and_link([_]), do: {:error, nil}
def get_title_and_link([{"title", title}, {"href", href}]) do
{:ok,
%{
full_title: title,
channel_url: "https://www.youtube.com#{href}",
title: href |> String.replace("/@", "")
}}
end
def get_title_and_link([_ | rest]), do: get_title_and_link(rest)
end
alias YTOrg.YoutubeSubsExtractor
sub_data = subs_html |> YoutubeSubsExtractor.extract_subs()
sub_data |> Enum.count()
Well, no wonder I can't wrap my head around over 300 subs.
subbed_channels =
for sub <- sub_data do
sub.title
end
__DIR__
|> Path.join("assets/subbed_channels.json")
|> File.write!(subbed_channels |> Poison.encode!())
{time, _} =
:timer.tc(fn ->
YTOrg.Channels.fetch_latest_videos(subbed_channels |> Enum.take(2))
end)
time / 1_000_000
"http://youtube.com/@/channel/UCVhEmzWMkRhVRKvFVvf-YoA"
|> YTOrg.Channels.ScrapeTube.get_channel_videos()
Kernel.div(300, 16)
n = subbed_channels |> Enum.count()
{time, subbed_videos} =
:timer.tc(fn ->
YTOrg.Channels.fetch_latest_videos(
subbed_channels |> Enum.take(n),
async_timeout: 4 * 1_000_000
)
end)
time / 1_000_000
subbed_videos |> Enum.count()
300_000 * 300 / 10 / 1_000_000
subbed_videos |> Enum.count()