Skip to content

Commit

Permalink
Mark overloads without a schema as obsolete and add documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Jan 21, 2025
1 parent 887b7e4 commit 4b96c0d
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 7 deletions.
18 changes: 18 additions & 0 deletions csharp/src/Apache.Arrow.Flight/Client/FlightClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,25 @@ public AsyncUnaryCall<FlightInfo> GetInfo(FlightDescriptor flightDescriptor, Met
flightInfoResult.Dispose);
}

[System.Obsolete("Use an async overload that takes a Schema")]
public FlightRecordBatchDuplexStreamingCall StartPut(FlightDescriptor flightDescriptor, Metadata headers = null)
{
return StartPut(flightDescriptor, headers, null, CancellationToken.None);
}

/// <summary>
/// Start a Flight Put request.
/// </summary>
/// <param name="flightDescriptor">Descriptor for the data to be put</param>
/// <param name="schema">The schema of the data</param>
/// <param name="headers">gRPC headers to send with the request</param>
/// <returns>A <see cref="FlightRecordBatchDuplexStreamingCall" /> object used to write data batches and receive responses</returns>
public Task<FlightRecordBatchDuplexStreamingCall> StartPut(FlightDescriptor flightDescriptor, Schema schema, Metadata headers = null)
{
return StartPut(flightDescriptor, schema, headers, null, CancellationToken.None);
}

[System.Obsolete("Use an async overload that takes a Schema")]
public FlightRecordBatchDuplexStreamingCall StartPut(FlightDescriptor flightDescriptor, Metadata headers, System.DateTime? deadline, CancellationToken cancellationToken = default)
{
var channels = _client.DoPut(headers, deadline, cancellationToken);
Expand All @@ -122,6 +131,15 @@ public FlightRecordBatchDuplexStreamingCall StartPut(FlightDescriptor flightDesc
channels.Dispose);
}

/// <summary>
/// Start a Flight Put request.
/// </summary>
/// <param name="flightDescriptor">Descriptor for the data to be put</param>
/// <param name="schema">The schema of the data</param>
/// <param name="headers">gRPC headers to send with the request</param>
/// <param name="deadline">Optional deadline. The request will be cancelled if this deadline is reached.</param>
/// <param name="cancellationToken">Optional token for cancelling the request</param>
/// <returns>A <see cref="FlightRecordBatchDuplexStreamingCall" /> object used to write data batches and receive responses</returns>
public async Task<FlightRecordBatchDuplexStreamingCall> StartPut(FlightDescriptor flightDescriptor, Schema schema, Metadata headers, System.DateTime? deadline, CancellationToken cancellationToken = default)
{
var channels = _client.DoPut(headers, deadline, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ private protected FlightRecordBatchStreamWriter(IAsyncStreamWriter<Protocol.Flig
_flightDescriptor = flightDescriptor;
}

/// <summary>
/// Configure the data stream to write to.
/// </summary>
/// <remarks>
/// The stream will be set up automatically when writing a RecordBatch if required,
/// but calling this method before writing any data allows handling empty streams.
/// </remarks>
/// <param name="schema">The schema of data to be written to this stream</param>
public async Task SetupStream(Schema schema)
{
if (_flightDataStream != null)
Expand Down
12 changes: 5 additions & 7 deletions csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public async Task TestPutSingleRecordBatch()
var flightDescriptor = FlightDescriptor.CreatePathDescriptor("test");
var expectedBatch = CreateTestBatch(0, 100);

var putStream = _flightClient.StartPut(flightDescriptor);
var putStream = await _flightClient.StartPut(flightDescriptor, expectedBatch.Schema);
await putStream.RequestStream.WriteAsync(expectedBatch);
await putStream.RequestStream.CompleteAsync();
var putResults = await putStream.ResponseStream.ToListAsync();
Expand All @@ -115,7 +115,7 @@ public async Task TestPutTwoRecordBatches()
var expectedBatch1 = CreateTestBatch(0, 100);
var expectedBatch2 = CreateTestBatch(0, 100);

var putStream = _flightClient.StartPut(flightDescriptor);
var putStream = await _flightClient.StartPut(flightDescriptor, expectedBatch1.Schema);
await putStream.RequestStream.WriteAsync(expectedBatch1);
await putStream.RequestStream.WriteAsync(expectedBatch2);
await putStream.RequestStream.CompleteAsync();
Expand Down Expand Up @@ -254,7 +254,7 @@ public async Task TestPutWithMetadata()
var expectedBatch = CreateTestBatch(0, 100);
var expectedMetadata = ByteString.CopyFromUtf8("test metadata");

var putStream = _flightClient.StartPut(flightDescriptor);
var putStream = await _flightClient.StartPut(flightDescriptor, expectedBatch.Schema);
await putStream.RequestStream.WriteAsync(expectedBatch, expectedMetadata);
await putStream.RequestStream.CompleteAsync();
var putResults = await putStream.ResponseStream.ToListAsync();
Expand Down Expand Up @@ -495,8 +495,7 @@ public async Task EnsureCallRaisesDeadlineExceeded()
exception = await Assert.ThrowsAsync<RpcException>(async () => await duplexStreamingCall.RequestStream.WriteAsync(batch));
Assert.Equal(StatusCode.DeadlineExceeded, exception.StatusCode);

var putStream = _flightClient.StartPut(flightDescriptor, null, deadline);
exception = await Assert.ThrowsAsync<RpcException>(async () => await putStream.RequestStream.WriteAsync(batch));
exception = await Assert.ThrowsAsync<RpcException>(async () => await _flightClient.StartPut(flightDescriptor, batch.Schema, null, deadline));
Assert.Equal(StatusCode.DeadlineExceeded, exception.StatusCode);

exception = await Assert.ThrowsAsync<RpcException>(async () => await _flightClient.GetSchema(flightDescriptor, null, deadline));
Expand Down Expand Up @@ -538,8 +537,7 @@ public async Task EnsureCallRaisesRequestCancelled()
exception = await Assert.ThrowsAsync<RpcException>(async () => await duplexStreamingCall.RequestStream.WriteAsync(batch));
Assert.Equal(StatusCode.Cancelled, exception.StatusCode);

var putStream = _flightClient.StartPut(flightDescriptor, null, null, cts.Token);
exception = await Assert.ThrowsAsync<RpcException>(async () => await putStream.RequestStream.WriteAsync(batch));
exception = await Assert.ThrowsAsync<RpcException>(async () => await _flightClient.StartPut(flightDescriptor, batch.Schema, null, null, cts.Token));
Assert.Equal(StatusCode.Cancelled, exception.StatusCode);

exception = await Assert.ThrowsAsync<RpcException>(async () => await _flightClient.GetSchema(flightDescriptor, null, null, cts.Token));
Expand Down

0 comments on commit 4b96c0d

Please sign in to comment.