Skip to content

Commit 69c0abb

Browse files
committed
Refactor .NET bindings for OpenDAL
- Removed OpenDALMethods.cs as it is no longer needed. - Updated OpenDALResult to simplify error handling and introduced OpenDALIntPtrResult and OpenDALByteBufferResult for better type safety. - Refactored Operator class to utilize new result structures and improved error handling. - Enhanced async methods to properly handle results and errors. - Added ByteBuffer struct for FFI-safe byte buffer representation. - Updated error handling in Rust code to align with new result structures. - Improved utility functions for memory management and string conversion. - Updated README to reflect changes in error semantics and usage.
1 parent 04b2f64 commit 69c0abb

16 files changed

Lines changed: 957 additions & 453 deletions

bindings/dotnet/DotOpenDAL.Tests/MemoryOperatorTest.cs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,36 @@ namespace DotOpenDAL.Tests;
2121

2222
public class MemoryOperatorTest
2323
{
24+
[Fact]
25+
public async Task ReadWrite_DisposeRace_DoesNotCrashProcess()
26+
{
27+
var op = new Operator("memory");
28+
byte[] content = [0x10, 0x20, 0x30, 0x40];
29+
op.Write("seed", content);
30+
31+
var workers = Enumerable.Range(0, 64).Select(async i =>
32+
{
33+
var path = $"race-{i % 8}";
34+
35+
try
36+
{
37+
op.Write(path, content);
38+
_ = op.Read(path);
39+
}
40+
catch (ObjectDisposedException)
41+
{
42+
}
43+
catch (OpenDALException)
44+
{
45+
}
46+
47+
await Task.Yield();
48+
});
49+
50+
var dispose = Task.Run(op.Dispose);
51+
await Task.WhenAll(workers.Append(dispose));
52+
}
53+
2454
[Fact]
2555
public void ReadWrite_Utf8Bytes_RoundTripsSuccessfully()
2656
{
@@ -182,4 +212,49 @@ public async Task ReadWriteAsync_CancelAfterDispatch_DoesNotBreakSubsequentOpera
182212
var stableRead = await op.ReadAsync("seed");
183213
Assert.Equal("seed-content", System.Text.Encoding.UTF8.GetString(stableRead));
184214
}
215+
216+
[Fact]
217+
public async Task ReadWriteAsync_DisposeRace_DoesNotCrashProcess()
218+
{
219+
var op = new Operator("memory");
220+
byte[] content = [1, 2, 3, 4, 5, 6, 7, 8];
221+
await op.WriteAsync("seed-async", content);
222+
223+
var workers = Enumerable.Range(0, 64).Select(async i =>
224+
{
225+
var path = $"race-async-{i % 8}";
226+
227+
try
228+
{
229+
await op.WriteAsync(path, content);
230+
_ = await op.ReadAsync(path);
231+
}
232+
catch (ObjectDisposedException)
233+
{
234+
}
235+
catch (OpenDALException)
236+
{
237+
}
238+
});
239+
240+
var dispose = Task.Run(op.Dispose);
241+
await Task.WhenAll(workers.Append(dispose));
242+
}
243+
244+
[Fact]
245+
public async Task ReadAsync_PathNotExists_RepeatedErrors_DoNotPoisonSubsequentCalls()
246+
{
247+
using var op = new Operator("memory");
248+
249+
for (var i = 0; i < 32; i++)
250+
{
251+
var ex = await Assert.ThrowsAsync<OpenDALException>(() => op.ReadAsync($"missing-{i}"));
252+
Assert.Equal(ErrorCode.NotFound, ex.Code);
253+
}
254+
255+
var content = System.Text.Encoding.UTF8.GetBytes("healthy");
256+
await op.WriteAsync("healthy", content);
257+
var read = await op.ReadAsync("healthy");
258+
Assert.Equal("healthy", System.Text.Encoding.UTF8.GetString(read));
259+
}
185260
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
namespace DotOpenDAL;
21+
22+
using System.Collections.Concurrent;
23+
using System.Diagnostics.CodeAnalysis;
24+
25+
/// <summary>
26+
/// Marker interface for async operation state tracked by the callback registry.
27+
/// </summary>
28+
public interface IAsyncState;
29+
30+
internal static class AsyncStateRegistry
31+
{
32+
private static long nextAsyncStateId;
33+
private static readonly ConcurrentDictionary<long, IAsyncState> AsyncStates = new();
34+
35+
public static long Register(IAsyncState state)
36+
{
37+
while (true)
38+
{
39+
var id = Interlocked.Increment(ref nextAsyncStateId);
40+
if (id == 0)
41+
{
42+
continue;
43+
}
44+
45+
if (AsyncStates.TryAdd(id, state))
46+
{
47+
return id;
48+
}
49+
}
50+
}
51+
52+
public static void Unregister(long context)
53+
{
54+
AsyncStates.TryRemove(context, out _);
55+
}
56+
57+
public static bool TryTake<TState>(IntPtr context, [NotNullWhen(true)] out TState? state) where TState : class
58+
{
59+
state = null;
60+
var key = context.ToInt64();
61+
if (!AsyncStates.TryRemove(key, out var value))
62+
{
63+
return false;
64+
}
65+
66+
state = value as TState;
67+
return state is not null;
68+
}
69+
}
70+
71+
/// <summary>
72+
/// State object for a pending asynchronous write operation.
73+
/// </summary>
74+
public sealed class WriteAsyncState : IAsyncState
75+
{
76+
/// <summary>
77+
/// Completion source that resolves when the native write callback arrives.
78+
/// </summary>
79+
public TaskCompletionSource Completion { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously);
80+
81+
/// <summary>
82+
/// Registration used to detach cancellation callbacks when the operation completes.
83+
/// </summary>
84+
public CancellationTokenRegistration CancellationRegistration { get; set; }
85+
}
86+
87+
/// <summary>
88+
/// State object for a pending asynchronous read operation.
89+
/// </summary>
90+
public sealed class ReadAsyncState : IAsyncState
91+
{
92+
/// <summary>
93+
/// Completion source that resolves with bytes when the native read callback arrives.
94+
/// </summary>
95+
public TaskCompletionSource<byte[]> Completion { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously);
96+
97+
/// <summary>
98+
/// Registration used to detach cancellation callbacks when the operation completes.
99+
/// </summary>
100+
public CancellationTokenRegistration CancellationRegistration { get; set; }
101+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
using System.Runtime.InteropServices;
21+
22+
namespace DotOpenDAL;
23+
24+
[StructLayout(LayoutKind.Sequential)]
25+
/// <summary>
26+
/// FFI representation of a Rust byte buffer.
27+
/// </summary>
28+
public struct ByteBuffer
29+
{
30+
/// <summary>
31+
/// Pointer to the first byte in unmanaged memory.
32+
/// </summary>
33+
public IntPtr Data;
34+
35+
/// <summary>
36+
/// Number of valid bytes in <see cref="Data"/>.
37+
/// </summary>
38+
public nuint Len;
39+
40+
/// <summary>
41+
/// Total allocated capacity in bytes.
42+
/// </summary>
43+
public nuint Capacity;
44+
45+
/// <summary>
46+
/// Releases the unmanaged buffer allocated by the Rust binding.
47+
/// </summary>
48+
public readonly void Release()
49+
{
50+
if (Data == IntPtr.Zero)
51+
{
52+
return;
53+
}
54+
55+
if (Capacity == 0 || Capacity < Len)
56+
{
57+
return;
58+
}
59+
60+
NativeMethods.buffer_free(Data, Len, Capacity);
61+
}
62+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
using System.Runtime.InteropServices;
21+
using System.Runtime.CompilerServices;
22+
23+
namespace DotOpenDAL;
24+
25+
internal partial class NativeMethods
26+
{
27+
const string __DllName = "opendal_dotnet";
28+
29+
[LibraryImport(__DllName, EntryPoint = "operator_construct", StringMarshalling = StringMarshalling.Utf8)]
30+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
31+
internal static unsafe partial OpenDALIntPtrResult operator_construct(
32+
string scheme,
33+
IntPtr* keys,
34+
IntPtr* values,
35+
nuint len);
36+
37+
[LibraryImport(__DllName, EntryPoint = "operator_free")]
38+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
39+
internal static partial void operator_free(IntPtr op);
40+
41+
[LibraryImport(__DllName, EntryPoint = "operator_write", StringMarshalling = StringMarshalling.Utf8)]
42+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
43+
internal static unsafe partial OpenDALResult operator_write(Operator op, string path, byte* data, nuint len);
44+
45+
[LibraryImport(__DllName, EntryPoint = "operator_read", StringMarshalling = StringMarshalling.Utf8)]
46+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
47+
internal static partial OpenDALByteBufferResult operator_read(Operator op, string path);
48+
49+
[LibraryImport(__DllName, EntryPoint = "operator_write_async", StringMarshalling = StringMarshalling.Utf8)]
50+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
51+
internal static unsafe partial OpenDALResult operator_write_async(
52+
Operator op,
53+
string path,
54+
byte* data,
55+
nuint len,
56+
delegate* unmanaged[Cdecl]<IntPtr, OpenDALResult, void> callback,
57+
IntPtr context);
58+
59+
[LibraryImport(__DllName, EntryPoint = "operator_read_async", StringMarshalling = StringMarshalling.Utf8)]
60+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
61+
internal static unsafe partial OpenDALResult operator_read_async(
62+
Operator op,
63+
string path,
64+
delegate* unmanaged[Cdecl]<IntPtr, OpenDALByteBufferResult, void> callback,
65+
IntPtr context);
66+
67+
[LibraryImport(__DllName, EntryPoint = "message_free")]
68+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
69+
internal static partial void message_free(IntPtr message);
70+
71+
[LibraryImport(__DllName, EntryPoint = "buffer_free")]
72+
[UnmanagedCallConv(CallConvs = [typeof(CallConvCdecl)])]
73+
internal static partial void buffer_free(IntPtr data, nuint len, nuint capacity);
74+
}

0 commit comments

Comments
 (0)