diff --git a/Gemfile b/Gemfile index b80621bf9ded48..d5c8fecf3d518f 100644 --- a/Gemfile +++ b/Gemfile @@ -159,6 +159,9 @@ group :test do # Stub web requests for specs gem 'webmock', '~> 3.18' + + # Websocket driver for testing integration between rails/sidekiq and streaming + gem 'websocket-driver', '~> 0.8', require: false end group :development do diff --git a/Gemfile.lock b/Gemfile.lock index 73755eb3b9e71d..609a93837172cf 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -502,7 +502,7 @@ GEM tzinfo validate_url webfinger (~> 2.0) - openssl (3.3.0) + openssl (3.3.1) openssl-signature_algorithm (1.3.0) openssl (> 2.0) opentelemetry-api (1.6.0) @@ -649,7 +649,7 @@ GEM activesupport (>= 3.0.0) raabro (1.4.0) racc (1.8.1) - rack (3.1.16) + rack (3.2.3) rack-attack (6.7.0) rack (>= 1.0, < 4) rack-cors (3.0.0) @@ -906,7 +906,7 @@ GEM unicode-display_width (3.1.5) unicode-emoji (~> 4.0, >= 4.0.4) unicode-emoji (4.0.4) - uri (1.0.3) + uri (1.0.4) useragent (0.16.11) validate_url (1.0.15) activemodel (>= 3.0.0) @@ -1103,6 +1103,7 @@ DEPENDENCIES webauthn (~> 3.0) webmock (~> 3.18) webpush! + websocket-driver (~> 0.8) xorcist (~> 1.1) RUBY VERSION diff --git a/app/controllers/admin/dashboard_controller.rb b/app/controllers/admin/dashboard_controller.rb index 5b0867dcfbac2f..fe314daeca69f6 100644 --- a/app/controllers/admin/dashboard_controller.rb +++ b/app/controllers/admin/dashboard_controller.rb @@ -9,10 +9,16 @@ def index @pending_appeals_count = Appeal.pending.async_count @pending_reports_count = Report.unresolved.async_count - @pending_tags_count = Tag.pending_review.async_count + @pending_tags_count = pending_tags.async_count @pending_users_count = User.pending.async_count @system_checks = Admin::SystemCheck.perform(current_user) @time_period = (29.days.ago.to_date...Time.now.utc.to_date) end + + private + + def pending_tags + ::Trends::TagFilter.new(status: :pending_review).results + end end end diff --git a/app/javascript/mastodon/components/featured_carousel.tsx b/app/javascript/mastodon/components/featured_carousel.tsx index 195331ef9f5d76..df64c43b421836 100644 --- a/app/javascript/mastodon/components/featured_carousel.tsx +++ b/app/javascript/mastodon/components/featured_carousel.tsx @@ -20,7 +20,7 @@ import { useDrag } from '@use-gesture/react'; import { expandAccountFeaturedTimeline } from '@/mastodon/actions/timelines'; import { Icon } from '@/mastodon/components/icon'; import { IconButton } from '@/mastodon/components/icon_button'; -import StatusContainer from '@/mastodon/containers/status_container'; +import { StatusQuoteManager } from '@/mastodon/components/status_quoted'; import { usePrevious } from '@/mastodon/hooks/usePrevious'; import { useAppDispatch, useAppSelector } from '@/mastodon/store'; import ChevronLeftIcon from '@/material-icons/400-24px/chevron_left.svg?react'; @@ -218,12 +218,7 @@ const FeaturedCarouselItem: React.FC< ref={handleRef} {...props} > - + ); }; diff --git a/app/javascript/styles/entrypoints/mailer.scss b/app/javascript/styles/entrypoints/mailer.scss index 7d2a54afae0ca2..fcbbd66f4c7747 100644 --- a/app/javascript/styles/entrypoints/mailer.scss +++ b/app/javascript/styles/entrypoints/mailer.scss @@ -88,6 +88,14 @@ table + p { padding: 24px; } +.email-inner-nested-card-td { + border-radius: 12px; + padding: 18px; + overflow: hidden; + background-color: #fff; + border: 1px solid #dfdee3; +} + // Account .email-account-banner-table { background-color: #f3f2f5; @@ -559,12 +567,29 @@ table + p { } } +.email-quote-header-img { + width: 34px; + + img { + width: 34px; + height: 34px; + border-radius: 8px; + overflow: hidden; + } +} + .email-status-header-text { padding-left: 16px; padding-right: 16px; vertical-align: middle; } +.email-quote-header-text { + padding-left: 14px; + padding-right: 14px; + vertical-align: middle; +} + .email-status-header-name { font-size: 16px; font-weight: 600; @@ -578,6 +603,19 @@ table + p { color: #746a89; } +.email-quote-header-name { + font-size: 14px; + font-weight: 600; + line-height: 18px; + color: #17063b; +} + +.email-quote-header-handle { + font-size: 13px; + line-height: 18px; + color: #746a89; +} + .email-status-content { padding-top: 24px; } @@ -589,6 +627,10 @@ table + p { } .email-status-prose { + .quote-inline { + display: none; + } + p { font-size: 14px; line-height: 20px; diff --git a/app/lib/activitypub/parser/status_parser.rb b/app/lib/activitypub/parser/status_parser.rb index 32af75fd2e471b..3227059d515295 100644 --- a/app/lib/activitypub/parser/status_parser.rb +++ b/app/lib/activitypub/parser/status_parser.rb @@ -175,7 +175,7 @@ def quote_approval_uri def quote_subpolicy(subpolicy) flags = 0 - allowed_actors = as_array(subpolicy) + allowed_actors = as_array(subpolicy).dup allowed_actors.uniq! flags |= Status::QUOTE_APPROVAL_POLICY_FLAGS[:public] if allowed_actors.delete('as:Public') || allowed_actors.delete('Public') || allowed_actors.delete('https://www.w3.org/ns/activitystreams#Public') diff --git a/app/lib/permalink_redirector.rb b/app/lib/permalink_redirector.rb index 142a05d10d3daa..301a588686ab48 100644 --- a/app/lib/permalink_redirector.rb +++ b/app/lib/permalink_redirector.rb @@ -26,7 +26,7 @@ def object end def redirect_path - return ActivityPub::TagManager.instance.url_for(object) if object.present? + return ActivityPub::TagManager.instance.url_for(object) || ActivityPub::TagManager.instance.uri_for(object) if object.present? @path.delete_prefix('/deck') if @path.start_with?('/deck') end diff --git a/app/models/concerns/account/suspensions.rb b/app/models/concerns/account/suspensions.rb index cdf3c1cb245fc3..45a4e517ed4905 100644 --- a/app/models/concerns/account/suspensions.rb +++ b/app/models/concerns/account/suspensions.rb @@ -33,6 +33,10 @@ def suspend!(date: Time.now.utc, origin: :local, block_email: true) update!(suspended_at: date, suspension_origin: origin) create_canonical_email_block! if block_email end + + # This terminates all connections for the given account with the streaming + # server: + redis.publish("timeline:system:#{id}", Oj.dump(event: :kill)) if local? end def unsuspend! diff --git a/app/models/user.rb b/app/models/user.rb index 6c14d9f854e2ad..3d21805a22efc0 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -185,6 +185,10 @@ def valid_invitation? def disable! update!(disabled: true) + + # This terminates all connections for the given account with the streaming + # server: + redis.publish("timeline:system:#{account.id}", Oj.dump(event: :kill)) end def enable! @@ -378,17 +382,22 @@ def revoke_access! end def reset_password! + # First, change password to something random, this revokes sessions and on-going access: + change_password!(SecureRandom.hex) + + # Finally, send a reset password prompt to the user + send_reset_password_instructions + end + + def change_password!(new_password) # First, change password to something random and deactivate all sessions transaction do - update(password: SecureRandom.hex) + update(password: new_password) session_activations.destroy_all end # Then, remove all authorized applications and connected push subscriptions revoke_access! - - # Finally, send a reset password prompt to the user - send_reset_password_instructions end protected diff --git a/app/services/activitypub/process_status_update_service.rb b/app/services/activitypub/process_status_update_service.rb index f2ad8fe75c9733..f0b91f03fe9aa5 100644 --- a/app/services/activitypub/process_status_update_service.rb +++ b/app/services/activitypub/process_status_update_service.rb @@ -85,6 +85,8 @@ def handle_implicit_update! update_quote_approval! update_counts! end + + broadcast_updates! if @status.quote&.state_previously_changed? end def update_interaction_policies! diff --git a/app/views/notification_mailer/_nested_quote.html.haml b/app/views/notification_mailer/_nested_quote.html.haml new file mode 100644 index 00000000000000..e66736399f47a8 --- /dev/null +++ b/app/views/notification_mailer/_nested_quote.html.haml @@ -0,0 +1,17 @@ +%table.email-w-full{ cellspacing: 0, cellpadding: 0, border: 0, role: 'presentation' } + %tr + %td.email-quote-header-img + = image_tag full_asset_url(status.account.avatar.url), alt: '', width: 34, height: 34 + %td.email-quote-header-text + %h2.email-quote-header-name + = display_name(status.account) + %p.email-quote-header-handle + @#{status.account.pretty_acct} + +%table.email-w-full{ cellspacing: 0, cellpadding: 0, border: 0, role: 'presentation' } + %tr + %td.email-status-content + = render 'status_content', status: status + + %p.email-status-footer + = link_to l(status.created_at.in_time_zone(time_zone.presence), format: :with_time_zone), web_url("@#{status.account.pretty_acct}/#{status.id}") diff --git a/app/views/notification_mailer/_status.html.haml b/app/views/notification_mailer/_status.html.haml index bf38dc9aa26ca5..064709e7dac75e 100644 --- a/app/views/notification_mailer/_status.html.haml +++ b/app/views/notification_mailer/_status.html.haml @@ -11,21 +11,12 @@ %table.email-w-full{ cellspacing: 0, cellpadding: 0, border: 0, role: 'presentation' } %tr %td.email-status-content - .auto-dir - - if status.spoiler_text? - %p.email-status-spoiler - = status.spoiler_text - - .email-status-prose - = status_content_format(status) - - - if status.ordered_media_attachments.size.positive? - %p.email-status-media - - status.ordered_media_attachments.each do |a| - - if status.local? - = link_to full_asset_url(a.file.url(:original)), full_asset_url(a.file.url(:original)) - - else - = link_to a.remote_url, a.remote_url + = render 'status_content', status: status + - if status.local? && status.quote + %table.email-inner-card-table{ cellspacing: 0, cellpadding: 0, border: 0, role: 'presentation' } + %tr + %td.email-inner-nested-card-td + = render 'nested_quote', status: status.quote.quoted_status, time_zone: time_zone %p.email-status-footer = link_to l(status.created_at.in_time_zone(time_zone.presence), format: :with_time_zone), web_url("@#{status.account.pretty_acct}/#{status.id}") diff --git a/app/views/notification_mailer/_status.text.erb b/app/views/notification_mailer/_status.text.erb index e03e8346c16a25..13711ee74d9c70 100644 --- a/app/views/notification_mailer/_status.text.erb +++ b/app/views/notification_mailer/_status.text.erb @@ -4,5 +4,9 @@ > <% end %> > <%= raw word_wrap(extract_status_plain_text(status), break_sequence: "\n> ") %> +<% if status.local? && status.quote %> +> +>> <%= raw word_wrap(extract_status_plain_text(status.quote.quoted_status), break_sequence: "\n>> ") %> +<% end %> <%= raw t('application_mailer.view')%> <%= web_url("@#{status.account.pretty_acct}/#{status.id}") %> diff --git a/app/views/notification_mailer/_status_content.html.haml b/app/views/notification_mailer/_status_content.html.haml new file mode 100644 index 00000000000000..f95ba8ccba856a --- /dev/null +++ b/app/views/notification_mailer/_status_content.html.haml @@ -0,0 +1,15 @@ +.auto-dir + - if status.spoiler_text? + %p.email-status-spoiler + = status.spoiler_text + + .email-status-prose + = status_content_format(status) + + - if status.ordered_media_attachments.size.positive? + %p.email-status-media + - status.ordered_media_attachments.each do |a| + - if status.local? + = link_to full_asset_url(a.file.url(:original)), full_asset_url(a.file.url(:original)) + - else + = link_to a.remote_url, a.remote_url diff --git a/app/workers/activitypub/refetch_and_verify_quote_worker.rb b/app/workers/activitypub/refetch_and_verify_quote_worker.rb index 0c7ecd9b2ac021..e2df0231030767 100644 --- a/app/workers/activitypub/refetch_and_verify_quote_worker.rb +++ b/app/workers/activitypub/refetch_and_verify_quote_worker.rb @@ -10,6 +10,7 @@ class ActivityPub::RefetchAndVerifyQuoteWorker def perform(quote_id, quoted_uri, options = {}) quote = Quote.find(quote_id) ActivityPub::VerifyQuoteService.new.call(quote, fetchable_quoted_uri: quoted_uri, request_id: options[:request_id]) + ::DistributionWorker.perform_async(quote.status_id, { 'update' => true }) if quote.state_previously_changed? rescue ActiveRecord::RecordNotFound # Do nothing true diff --git a/docker-compose.yml b/docker-compose.yml index 2ccd5b89ae3e61..54df3ee03d1d25 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -59,7 +59,7 @@ services: web: # You can uncomment the following line if you want to not use the prebuilt image, for example if you have local code changes build: . - image: kmyblue:20.2-dev + image: kmyblue:20.3 restart: always env_file: .env.production command: bundle exec puma -C config/puma.rb @@ -83,7 +83,7 @@ services: build: dockerfile: ./streaming/Dockerfile context: . - image: kmyblue-streaming:20.2-dev + image: kmyblue-streaming:20.3 restart: always env_file: .env.production command: node ./streaming/index.js @@ -101,7 +101,7 @@ services: sidekiq: build: . - image: kmyblue:20.2-dev + image: kmyblue:20.3 restart: always env_file: .env.production command: bundle exec sidekiq diff --git a/lib/mastodon/cli/accounts.rb b/lib/mastodon/cli/accounts.rb index 1b33f56055a901..25e966bd8eabc9 100644 --- a/lib/mastodon/cli/accounts.rb +++ b/lib/mastodon/cli/accounts.rb @@ -165,14 +165,17 @@ def modify(username) user.role_id = nil end - password = SecureRandom.hex if options[:reset_password] - user.password = password if options[:reset_password] user.email = options[:email] if options[:email] user.disabled = false if options[:enable] user.disabled = true if options[:disable] user.approved = true if options[:approve] user.disable_two_factor! if options[:disable_2fa] + # Password changes are a little different, as we also need to ensure + # sessions, subscriptions, and access tokens are revoked after changing: + password = SecureRandom.hex if options[:reset_password] + user.change_password!(password) if options[:reset_password] + if user.save user.confirm if options[:confirm] diff --git a/lib/mastodon/version.rb b/lib/mastodon/version.rb index eb039ae6f45619..ec5092c9d6d74a 100644 --- a/lib/mastodon/version.rb +++ b/lib/mastodon/version.rb @@ -13,7 +13,7 @@ def kmyblue_major end def kmyblue_minor - 2 + 3 end def kmyblue_flag diff --git a/spec/lib/mastodon/cli/accounts_spec.rb b/spec/lib/mastodon/cli/accounts_spec.rb index 111703a18bbe6b..927c6ca8debed8 100644 --- a/spec/lib/mastodon/cli/accounts_spec.rb +++ b/spec/lib/mastodon/cli/accounts_spec.rb @@ -361,11 +361,20 @@ def account_from_options context 'with --reset-password option' do let(:options) { { reset_password: true } } + let(:user) { Fabricate(:user, password: original_password) } + let(:original_password) { 'foobar12345' } + let(:new_password) { 'new_password12345' } + it 'returns a new password for the user' do - allow(SecureRandom).to receive(:hex).and_return('new_password') + allow(SecureRandom).to receive(:hex).and_return(new_password) + allow(Account).to receive(:find_local).and_return(user.account) + allow(user).to receive(:change_password!).and_call_original expect { subject } - .to output_results('new_password') + .to output_results(new_password) + + expect(user).to have_received(:change_password!).with(new_password) + expect(user.reload).to_not be_external_or_valid_password(original_password) end end diff --git a/spec/models/user_spec.rb b/spec/models/user_spec.rb index a9ab15a956ed6c..7088266b34eb38 100644 --- a/spec/models/user_spec.rb +++ b/spec/models/user_spec.rb @@ -382,12 +382,15 @@ let(:current_sign_in_at) { Time.zone.now } - before do + it 'disables user' do + allow(redis).to receive(:publish) + user.disable! - end - it 'disables user' do expect(user).to have_attributes(disabled: true) + + expect(redis) + .to have_received(:publish).with("timeline:system:#{user.account.id}", Oj.dump(event: :kill)).once end end diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index 3d3e556f353d3b..6be93ecb70ef99 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -30,7 +30,8 @@ # This needs to be defined before Rails is initialized STREAMING_PORT = ENV.fetch('TEST_STREAMING_PORT', '4020') -ENV['STREAMING_API_BASE_URL'] = "http://localhost:#{STREAMING_PORT}" +STREAMING_HOST = ENV.fetch('TEST_STREAMING_HOST', 'localhost') +ENV['STREAMING_API_BASE_URL'] = "http://#{STREAMING_HOST}:#{STREAMING_PORT}" require_relative '../config/environment' diff --git a/spec/support/streaming_client.rb b/spec/support/streaming_client.rb new file mode 100644 index 00000000000000..02186e781c7d3e --- /dev/null +++ b/spec/support/streaming_client.rb @@ -0,0 +1,205 @@ +# frozen_string_literal: true + +require 'websocket/driver' + +class StreamingClient + module AUTHENTICATION + SUBPROTOCOL = 1 + AUTHORIZATION_HEADER = 2 + QUERY_PARAMETER = 3 + end + + class Connection + attr_reader :url, :messages, :last_error + attr_accessor :logger, :protocols + + def initialize(url) + @uri = URI.parse(url) + @query_params = @uri.query.present? ? URI.decode_www_form(@uri.query).to_h : {} + @protocols = nil + @headers = {} + + @dead = false + + @events_queue = Thread::Queue.new + @messages = [] + @last_error = nil + end + + def set_header(key, value) + @headers[key] = value + end + + def set_query_param(key, value) + @query_params[key] = value + end + + def driver + return @driver if defined?(@driver) + + @uri.query = URI.encode_www_form(@query_params) + @url = @uri.to_s + @tcp = TCPSocket.new(@uri.host, @uri.port) + + @driver = WebSocket::Driver.client(self, { + protocols: @protocols, + }) + + @headers.each_pair do |key, value| + @driver.set_header(key, value) + end + + at_exit do + @driver.close + end + + @driver.on(:open) do + @events_queue.enq({ event: :opened }) + end + + @driver.on(:message) do |event| + @events_queue.enq({ event: :message, payload: event.data }) + @messages << event.data + end + + @driver.on(:error) do |event| + logger&.debug(event.message) + @events_queue.enq({ event: :error, payload: event }) + @last_error = event + end + + @driver.on(:close) do |event| + @events_queue.enq({ event: :closing, payload: event }) + finalize(event) + end + + @thread = Thread.new do + @driver.parse(@tcp.read(1)) until @dead || @tcp.closed? + rescue Errno::ECONNRESET + # Create a synthetic close event: + close_event = WebSocket::Driver::CloseEvent.new( + WebSocket::Driver::Hybi::ERRORS[:unexpected_condition], + 'Connection reset' + ) + + finalize(close_event) + end + + @driver + end + + def wait_for_event(expected_event, timeout: 10) + Timeout.timeout(timeout) do + loop do + event = dequeue_event + + return nil if event.nil? && @events_queue.closed? + return event[:payload] unless event.nil? || event[:event] != expected_event + end + end + end + + def write(data) + @tcp.write(data) + rescue Errno::EPIPE => e + logger&.debug("EPIPE: #{e}") + end + + def finalize(event) + @dead = true + @events_queue.enq({ event: :closed, payload: event }) + @events_queue.close + @thread.kill + end + + def dequeue_event + event = @events_queue.pop + logger&.debug(event) unless event.nil? + event + end + end + + def initialize + @logger = Logger.new($stdout) + @logger.level = 'info' + + @connection = Connection.new("ws://#{STREAMING_HOST}:#{STREAMING_PORT}/api/v1/streaming") + @connection.logger = @logger + end + + def debug! + @logger.debug! + end + + def authenticate(access_token, authentication_method = StreamingClient::AUTHENTICATION::SUBPROTOCOL) + raise 'Invalid access_token passed to StreamingClient, expected a string' unless access_token.is_a?(String) + + case authentication_method + when AUTHENTICATION::QUERY_PARAMETER + @connection.set_query_param('access_token', access_token) + when AUTHENTICATION::SUBPROTOCOL + @connection.protocols = access_token + when AUTHENTICATION::AUTHORIZATION_HEADER + @connection.set_header('Authorization', "Bearer #{access_token}") + else + raise 'Invalid authentication method' + end + end + + def connect + @connection.driver.start + @connection.wait_for_event(:opened) + end + + def subscribe(channel, **params) + send(Oj.dump({ type: 'subscribe', stream: channel }.merge(params))) + end + + def wait_for(event = nil) + @connection.wait_for_event(event) + end + + def wait_for_message + message = @connection.wait_for_event(:message) + event = Oj.load(message) + event['payload'] = Oj.load(event['payload']) if event['payload'] + + event.deep_symbolize_keys + end + + delegate :status, :state, to: :'@connection.driver' + delegate :messages, to: :@connection + + def open? + state == :open + end + + def closing? + state == :closing + end + + def closed? + state == :closed + end + + def send(message) + @connection.driver.text(message) if open? + end + + def close + return if closed? + + @connection.driver.close unless closing? + @connection.wait_for_event(:closed) + end +end + +module StreamingClientHelper + def streaming_client + @streaming_client ||= StreamingClient.new + end +end + +RSpec.configure do |config| + config.include StreamingClientHelper, :streaming +end diff --git a/spec/support/streaming_server_manager.rb b/spec/support/streaming_server_manager.rb index d98f7dd960735e..b565ed79a88d56 100644 --- a/spec/support/streaming_server_manager.rb +++ b/spec/support/streaming_server_manager.rb @@ -12,6 +12,11 @@ def start(port: 4020) queue = Queue.new + if ENV['DEBUG_STREAMING_SERVER'].present? + logger = Logger.new($stdout) + logger.level = 'debug' + end + @queue = queue @running_thread = Thread.new do @@ -31,7 +36,7 @@ def start(port: 4020) # Spawn a thread to listen on streaming server output output_thread = Thread.new do stdout_err.each_line do |line| - Rails.logger.info "Streaming server: #{line}" + logger&.info "Streaming server: #{line}" if status == :starting && line.match('Streaming API now listening on') status = :started @@ -115,12 +120,12 @@ def stop self.use_transactional_tests = true end - private - def streaming_server_manager @streaming_server_manager ||= StreamingServerManager.new end + private + def streaming_examples_present? RSpec.world.filtered_examples.values.flatten.any? { |example| example.metadata[:streaming] == true } end diff --git a/spec/system/admin/dashboard_spec.rb b/spec/system/admin/dashboard_spec.rb index 06d31cde44e965..d0cedd2ed19ffa 100644 --- a/spec/system/admin/dashboard_spec.rb +++ b/spec/system/admin/dashboard_spec.rb @@ -9,6 +9,7 @@ before do stub_system_checks Fabricate :software_update + Fabricate :tag, requested_review_at: 5.minutes.ago sign_in(user) end @@ -18,6 +19,7 @@ expect(page) .to have_title(I18n.t('admin.dashboard.title')) .and have_content(I18n.t('admin.system_checks.software_version_patch_check.message_html')) + .and have_content('0 pending hashtags') end private diff --git a/spec/system/streaming/channel_subscriptions_spec.rb b/spec/system/streaming/channel_subscriptions_spec.rb new file mode 100644 index 00000000000000..447ea64f22f35f --- /dev/null +++ b/spec/system/streaming/channel_subscriptions_spec.rb @@ -0,0 +1,144 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe 'Channel Subscriptions', :inline_jobs, :streaming do + let(:application) { Fabricate(:application, confidential: false) } + let(:scopes) { nil } + let(:access_token) { Fabricate(:accessible_access_token, resource_owner_id: user_account.user.id, application: application, scopes: scopes) } + + let(:user_account) { Fabricate(:account, username: 'alice', domain: nil) } + let(:bob_account) { Fabricate(:account, username: 'bob') } + + after do + streaming_client.close + end + + context 'when the access token has insufficient scope to read statuses' do + let(:scopes) { 'profile' } + + it 'cannot subscribe to the public:local channel' do + streaming_client.authenticate(access_token.token) + + streaming_client.connect + streaming_client.subscribe('public:local') + + # Receive the error back from the subscription attempt: + message = streaming_client.wait_for_message + + expect(message).to include( + error: 'Access token does not have the required scopes', + status: 401 + ) + end + end + + context 'when the access token has read scope' do + let(:scopes) { 'read' } + + it 'can subscribing to the public:local channel' do + streaming_client.authenticate(access_token.token) + + streaming_client.connect + streaming_client.subscribe('public:local') + + # We need to publish a status as there is no positive acknowledgement of + # subscriptions: + status = PostStatusService.new.call(bob_account, text: 'Hello @alice') + + # And then we want to receive that status: + message = streaming_client.wait_for_message + + expect(message).to include( + stream: be_an(Array).and(contain_exactly('public:local')), + event: 'update', + payload: include( + id: status.id.to_s + ) + ) + end + + it 'can subscribing to the user:notifications channel' do + streaming_client.authenticate(access_token.token) + + streaming_client.connect + streaming_client.subscribe('user:notification') + + # We need to perform an action that triggers a notification as there is + # no positive acknowledgement of subscriptions: + first_status = PostStatusService.new.call(user_account, text: 'Test') + ReblogService.new.call(bob_account, first_status) + + message = streaming_client.wait_for_message + + expect(message).to include( + event: 'notification', + stream: ['user:notification'] + ) + end + end + + context 'when the access token has read:statuses scope' do + let(:scopes) { 'read:statuses' } + + it 'can subscribing to the public:local channel' do + streaming_client.authenticate(access_token.token) + + streaming_client.connect + streaming_client.subscribe('public:local') + + # We need to publish a status as there is no positive acknowledgement of + # subscriptions: + status = PostStatusService.new.call(bob_account, text: 'Hello @alice') + + # And then we want to receive that status: + message = streaming_client.wait_for_message + + expect(message).to include( + stream: be_an(Array).and(contain_exactly('public:local')), + event: 'update', + payload: include( + id: status.id.to_s + ) + ) + end + + it 'cannot subscribing to the user:notifications channel' do + streaming_client.authenticate(access_token.token) + + streaming_client.connect + streaming_client.subscribe('user:notification') + + # We should receive an error back immediately: + message = streaming_client.wait_for_message + + expect(message).to include( + error: 'Access token does not have the required scopes', + status: 401 + ) + end + end + + context 'when the access token has read:notifications scope' do + let(:scopes) { 'read:notifications' } + + it 'can subscribing to the user:notifications channel' do + streaming_client.authenticate(access_token.token) + + streaming_client.connect + streaming_client.subscribe('user:notification') + + # We need to perform an action that triggers a notification as there is + # no positive acknowledgement of subscriptions: + first_status = PostStatusService.new.call(user_account, text: 'Test') + ReblogService.new.call(bob_account, first_status) + + message = streaming_client.wait_for_message + + expect(message).to include( + event: 'notification', + stream: ['user:notification'] + ) + end + end +end diff --git a/spec/system/streaming/streaming_spec.rb b/spec/system/streaming/streaming_spec.rb new file mode 100644 index 00000000000000..f5d3ba114265ae --- /dev/null +++ b/spec/system/streaming/streaming_spec.rb @@ -0,0 +1,125 @@ +# frozen_string_literal: true + +require 'rails_helper' +RSpec.describe 'Streaming', :inline_jobs, :streaming do + let(:authentication_method) { StreamingClient::AUTHENTICATION::SUBPROTOCOL } + let(:user) { Fabricate(:user) } + let(:scopes) { '' } + let(:application) { Fabricate(:application, confidential: false) } + let(:token) { Fabricate(:accessible_access_token, resource_owner_id: user.id, application: application, scopes: scopes) } + let(:access_token) { token.token } + + before do + streaming_client.authenticate(access_token, authentication_method) + end + + after do + streaming_client.close + end + + context 'when authenticating via subprotocol' do + it 'is able to connect' do + streaming_client.connect + + expect(streaming_client.status).to eq(101) + expect(streaming_client.open?).to be(true) + end + end + + context 'when authenticating via authorization header' do + let(:authentication_method) { StreamingClient::AUTHENTICATION::AUTHORIZATION_HEADER } + + it 'is able to connect successfully' do + streaming_client.connect + + expect(streaming_client.status).to eq(101) + expect(streaming_client.open?).to be(true) + end + end + + context 'when authenticating via query parameter' do + let(:authentication_method) { StreamingClient::AUTHENTICATION::QUERY_PARAMETER } + + it 'is able to connect successfully' do + streaming_client.connect + + expect(streaming_client.status).to eq(101) + expect(streaming_client.open?).to be(true) + end + end + + context 'with a revoked access token' do + before do + token.revoke + end + + it 'receives an 401 unauthorized error' do + streaming_client.connect + + expect(streaming_client.status).to eq(401) + expect(streaming_client.open?).to be(false) + end + end + + context 'when revoking an access token after connection' do + it 'disconnects the client' do + streaming_client.connect + + expect(streaming_client.status).to eq(101) + expect(streaming_client.open?).to be(true) + + token.revoke + + expect(streaming_client.wait_for(:closed).code).to be(1000) + expect(streaming_client.open?).to be(false) + end + end + + context 'with a disabled user account' do + before do + user.disable! + end + + it 'receives an 401 unauthorized error when trying to connect' do + streaming_client.connect + + expect(streaming_client.status).to eq(401) + expect(streaming_client.open?).to be(false) + end + end + + context 'when the user account is disabled whilst connected' do + it 'terminates the connection for the user' do + streaming_client.connect + + user.disable! + + expect(streaming_client.wait_for(:closed).code).to be(1000) + expect(streaming_client.open?).to be(false) + end + end + + context 'with a suspended user account' do + before do + user.account.suspend! + end + + it 'receives an 401 unauthorized error when trying to connect' do + streaming_client.connect + + expect(streaming_client.status).to eq(401) + expect(streaming_client.open?).to be(false) + end + end + + context 'when the user account is suspended whilst connected' do + it 'terminates the connection for the user' do + streaming_client.connect + + user.account.suspend! + + expect(streaming_client.wait_for(:closed).code).to be(1000) + expect(streaming_client.open?).to be(false) + end + end +end diff --git a/streaming/index.js b/streaming/index.js index 29e00ac941e7a1..17e4d5f1f2e3a2 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -91,17 +91,6 @@ const parseJSON = (json, req) => { } }; -const PUBLIC_CHANNELS = [ - 'public', - 'public:media', - 'public:local', - 'public:local:media', - 'public:remote', - 'public:remote:media', - 'hashtag', - 'hashtag:local', -]; - // Used for priming the counters/gauges for the various metrics that are // per-channel const CHANNEL_NAMES = [ @@ -109,8 +98,16 @@ const CHANNEL_NAMES = [ 'user', 'user:notification', 'list', + 'antenna', 'direct', - ...PUBLIC_CHANNELS, + 'public', + 'public:media', + 'public:local', + 'public:local:media', + 'public:remote', + 'public:remote:media', + 'hashtag', + 'hashtag:local', ]; const startServer = async () => { @@ -398,7 +395,7 @@ const startServer = async () => { */ const accountFromToken = async (token, req) => { const result = await pgPool.query( - 'SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', + 'SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id INNER JOIN accounts ON accounts.id = users.account_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL AND users.disabled IS FALSE AND accounts.suspended_at IS NULL LIMIT 1', [token], ); @@ -486,12 +483,6 @@ const startServer = async () => { new Promise((resolve, reject) => { logger.debug(`Checking OAuth scopes for ${channelName}`); - // When accessing public channels, no scopes are needed - if (channelName && PUBLIC_CHANNELS.includes(channelName)) { - resolve(); - return; - } - // The `read` scope has the highest priority, if the token has it // then it can access all streams const requiredScopes = ['read'];