Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions packages/core/lib/flush_policies/file_size_flush_policy.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import 'package:flutter/foundation.dart';
import 'package:segment_analytics/event.dart';
import 'package:segment_analytics/flush_policies/flush_policy.dart';

/// Flush policy that triggers when the current file size exceeds a threshold.
///
/// This policy works in conjunction with file rotation to ensure files
/// don't grow too large before being uploaded.
class FileSizeFlushPolicy extends FlushPolicy {
final int _maxFileSize;
int _estimatedCurrentSize = 0;

/// Creates a flush policy that triggers when file size exceeds maxFileSize
///
/// @param maxFileSize Maximum file size in bytes before triggering flush
FileSizeFlushPolicy(this._maxFileSize);

@visibleForTesting
int get estimatedCurrentSize => _estimatedCurrentSize;

@visibleForTesting
int get maxFileSize => _maxFileSize;

@override
void start() {
_estimatedCurrentSize = 0;
}

@override
onEvent(RawEvent event) {
// Estimate the serialized size of the event
final eventSize = _estimateEventSize(event);
_estimatedCurrentSize += eventSize;

if (_estimatedCurrentSize >= _maxFileSize) {
shouldFlush = true;
}
}

@override
reset() {
super.reset();
_estimatedCurrentSize = 0;
}

/// Estimate the serialized size of an event in bytes
int _estimateEventSize(RawEvent event) {
// Base size estimate for different event types
const baseEventSize = 200; // Basic event structure

int size = baseEventSize;

// Add size based on event type and properties
if (event is TrackEvent) {
size += event.event.length * 2; // Event name (UTF-8 can be up to 2 bytes per char)
size += _estimatePropertiesSize(event.properties);
} else if (event is ScreenEvent) {
size += event.name.length * 2;
size += _estimatePropertiesSize(event.properties);
} else if (event is IdentifyEvent) {
size += (event.userId?.length ?? 0) * 2;
size += _estimateUserTraitsSize(event.traits);
} else if (event is GroupEvent) {
size += event.groupId.length * 2;
size += _estimateGroupTraitsSize(event.traits);
} else if (event is AliasEvent) {
size += (event.previousId.length * 2).toInt();
}

// Add context size estimate
size += 500; // Estimated context size

return size;
}

/// Estimate the size of properties map
int _estimatePropertiesSize(Map<String, dynamic>? properties) {
if (properties == null || properties.isEmpty) return 0;

int size = 0;
properties.forEach((key, value) {
size += key.length * 2; // Key size
size += _estimateValueSize(value);
});

return size;
}

/// Estimate the size of user traits
int _estimateUserTraitsSize(UserTraits? traits) {
if (traits == null) return 0;

// This would need to be implemented based on UserTraits structure
// For now, provide a reasonable estimate
return 100; // Base estimate for user traits
}

/// Estimate the size of group traits
int _estimateGroupTraitsSize(GroupTraits? traits) {
if (traits == null) return 0;

// This would need to be implemented based on GroupTraits structure
// For now, provide a reasonable estimate
return 100; // Base estimate for group traits
}

/// Estimate the size of a dynamic value
int _estimateValueSize(dynamic value) {
if (value == null) return 4; // "null"

if (value is String) {
return value.length * 2 + 2; // String content + quotes
} else if (value is num) {
return 20; // Reasonable estimate for numbers
} else if (value is bool) {
return 5; // "true" or "false"
} else if (value is List) {
int size = 2; // []
for (var item in value) {
size += _estimateValueSize(item) + 1; // Item + comma
}
return size;
} else if (value is Map) {
int size = 2; // {}
value.forEach((key, val) {
size += key.toString().length * 2 + 3; // Key + quotes + colon
size += _estimateValueSize(val) + 1; // Value + comma
});
return size;
}

// Fallback for other types
return value.toString().length * 2;
}

/// Manually update the estimated file size (for external size tracking)
void updateEstimatedSize(int newSize) {
_estimatedCurrentSize = newSize;
if (_estimatedCurrentSize >= _maxFileSize) {
shouldFlush = true;
}
}

/// Add to the estimated size
void addEstimatedSize(int additionalSize) {
_estimatedCurrentSize += additionalSize;
if (_estimatedCurrentSize >= _maxFileSize) {
shouldFlush = true;
}
}
}
212 changes: 212 additions & 0 deletions packages/core/lib/plugins/queue_flushing_plugin_with_rotation.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import 'package:segment_analytics/analytics.dart';
import 'package:segment_analytics/event.dart';
import 'package:segment_analytics/plugin.dart';
import 'package:segment_analytics/state.dart';
import 'package:segment_analytics/logger.dart';
import 'package:segment_analytics/utils/store/store.dart';
import '../storage/file_rotation_config.dart';
import '../storage/file_rotation_manager.dart';

typedef OnFlush = Future Function(List<RawEvent> events);

/// Enhanced queue flushing plugin with file rotation support.
///
/// This plugin extends the original QueueFlushingPlugin to support automatic
/// file rotation when storage files exceed the maximum size limit.
class QueueFlushingPluginWithRotation extends UtilityPlugin {
QueueStateWithRotation<RawEvent>? _state;

bool _isPendingUpload = false;
final OnFlush _onFlush;
final FileRotationConfig _rotationConfig;

/// Creates a queue flushing plugin with file rotation support
///
/// @param onFlush callback to execute when the queue is flushed
/// @param rotationConfig configuration for file rotation behavior
QueueFlushingPluginWithRotation(
this._onFlush, {
FileRotationConfig? rotationConfig,
}) : _rotationConfig = rotationConfig ?? const FileRotationConfig(),
super(PluginType.after);

@override
configure(Analytics analytics) {
super.configure(analytics);

_state = QueueStateWithRotation(
"queue_flushing_plugin",
analytics.store,
(json) => eventFromJson(json),
_rotationConfig,
);

_state!.init(analytics.error, true);
}

@override
Future<RawEvent> execute(RawEvent event) async {
await _state!.ready;
await _state!.add(event);
return event;
}

/// Calls the onFlush callback with the events in the queue
@override
flush() async {
if (_state != null) {
await _state!.ready;
final events = await _state!.state;
try {
if (!_isPendingUpload && events.isNotEmpty) {
_isPendingUpload = true;
await _onFlush(events);
}
} finally {
_isPendingUpload = false;
}
}
}

/// Removes one or multiple events from the queue
/// @param events events to remove
Future dequeue(List<RawEvent> eventsToRemove) async {
await _state!.ready;
final events = await _state!.events;
for (var event in eventsToRemove) {
events.remove(event);
}
_state!.setEvents(events);
}

/// Get file rotation debug information
Future<Map<String, dynamic>> getRotationDebugInfo() async {
if (_state == null) return {};
return await _state!.getRotationDebugInfo();
}

/// Manually trigger file rotation (for testing)
Future<void> triggerRotation() async {
if (_state != null) {
await _state!.triggerRotation();
}
}
}

/// Enhanced queue state that supports file rotation
class QueueStateWithRotation<T extends JSONSerialisable> extends PersistedState<List<T>> {
final T Function(Map<String, dynamic> json) _elementFromJson;
final FileRotationConfig _rotationConfig;
FileRotationManager? _rotationManager;

QueueStateWithRotation(
String key,
Store store,
this._elementFromJson,
this._rotationConfig,
) : super(key, store, () async => []);

@override
void init(ErrorHandler errorHandler, bool storageJson) {
// Initialize rotation manager if enabled
if (_rotationConfig.enabled) {
// Get storage directory path from store implementation
_getStoragePath().then((storePath) async {
_rotationManager = FileRotationManager(_rotationConfig, storePath);
await _rotationManager!.ready;
});
}

// Call parent initialization
super.init(errorHandler, storageJson);
}

/// Get the storage path from the store implementation
Future<String> _getStoragePath() async {
// This is a simplified approach - in a real implementation,
// you'd need to extract the actual path from the store
// For now, we'll use a reasonable default
try {
// Try to get documents directory (platform-specific)
// This would need to be implemented based on the actual Store interface
return '/tmp/segment_analytics'; // Fallback path
} catch (e) {
log('Could not determine storage path, using fallback: $e',
kind: LogFilterKind.warning);
return '/tmp/segment_analytics';
}
}

Future add(T t) async {
await modifyState((state) async {
// Check if file rotation is needed before adding
if (_rotationConfig.enabled && _rotationManager != null) {
await _checkAndRotateIfNeeded([t]);
}

setState([...state, t]);
});
}

/// Check if rotation is needed and perform it if necessary
Future<void> _checkAndRotateIfNeeded(List<T> newEvents) async {
if (_rotationManager == null) return;

try {
// Convert to RawEvent list (assuming T extends RawEvent for our use case)
final events = newEvents.whereType<RawEvent>().toList();
if (events.isEmpty) return;

final targetFilePath = await _rotationManager!.checkRotationNeeded(events);

// Update file size tracking
_rotationManager!.updateFileSize(targetFilePath, events);

} catch (e) {
log('Error during rotation check: $e', kind: LogFilterKind.error);
}
}

/// Manually trigger rotation for testing
Future<void> triggerRotation() async {
if (_rotationManager != null) {
await _rotationManager!.finishCurrentFile();
}
}

/// Get rotation debug information
Future<Map<String, dynamic>> getRotationDebugInfo() async {
if (_rotationManager == null) {
return {'rotationEnabled': false};
}

return {
'rotationEnabled': true,
...(await _rotationManager!.getDebugInfo()),
};
}

@override
List<T> fromJson(Map<String, dynamic> json) {
final rawList = json['queue'] as List<dynamic>;
return rawList.map((e) => _elementFromJson(e)).toList();
}

@override
Map<String, dynamic> toJson(List<T> t) {
return {"queue": t.map((e) => e.toJson()).toList()};
}

Future<List<T>> get events => state;
void setEvents(List<T> events) => setState([...events]);

Future<void> flush({int? number}) async {
final events = await state;
if (number == null || number >= events.length) {
setState([]);
return;
}
events.removeRange(0, number);
setEvents(events);
}
}
Loading